Command增删功能调试通过。

This commit is contained in:
zhengchao
2017-07-05 20:58:38 +08:00
parent 5d765975d1
commit 1936dd60f2
7 changed files with 189 additions and 83 deletions

View File

@@ -13,7 +13,7 @@ const char* maat_redis_monitor="MAAT_REDIS_MONITOR";
const char* maat_redis_command="MAAT_REDIS_COMMAND";
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* rm_status_key="MAAT_UPDATE_STATUS";
const int MAAT_REDIS_SYNC_TIME=30*60;
struct serial_rule_t //rm= Redis Maat
@@ -32,6 +32,39 @@ struct _Maat_cmd_inner_t
int region_size[MAX_EXPR_ITEM_NUM];
struct _Maat_cmd_inner_t* next;
};
int _wrap_redisGetReply(redisContext *c, redisReply **reply)
{
return redisGetReply(c, (void **)reply);
}
redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
{
va_list ap;
void *reply = NULL;
va_start(ap,format);
reply = redisvCommand(c,format,ap);
va_end(ap);
return (redisReply *)reply;
}
long long read_redis_integer(const redisReply* reply)
{
switch(reply->type)
{
case REDIS_REPLY_INTEGER:
return reply->integer;
break;
case REDIS_REPLY_ARRAY:
assert(reply->element[0]->type==REDIS_REPLY_INTEGER);
return reply->element[0]->integer;
break;
case REDIS_REPLY_STRING:
return atoll(reply->str);
break;
default:
assert(0);
break;
}
return 0;
}
enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p)
{
enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP;
@@ -107,11 +140,12 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
{
j++;
}
if(j==offset)
if(j==offset-1)
{
break;
}
}
i++;
assert(i<strlen(line));
assert(line[i]=='1');
line[i]='0';
@@ -242,8 +276,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
return 0;
}
}
assert(reply->type==REDIS_REPLY_STRING);
version_in_redis=atoll(reply->str);
version_in_redis=read_redis_integer(reply);
assert(version_in_redis>=version);
freeReplyObject(reply);
if(version_in_redis==version)
@@ -260,17 +293,17 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version).
reply=(redisReply*)redisCommand(c, "ZRANGE MAAT_UPDATE_STATUS (%d %d",version,version_in_redis);
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_key,version,version_in_redis);
if(reply==NULL)
{
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET MAAT_UPDATE_STATUS failed %s.",err_buff);
"GET %s failed %s.",rm_status_key,err_buff);
return 0;
}
assert(reply->type==REDIS_REPLY_ARRAY);
tmp_reply=(redisReply*)redisCommand(c, "ZRANK MAAT_UPDATE_STATUS %s",reply->element[0]->str);
nearest_rule_version=tmp_reply->integer;
tmp_reply=(redisReply*)redisCommand(c, "ZSCORE %s %s",rm_status_key,reply->element[0]->str);
nearest_rule_version=read_redis_integer(tmp_reply);
freeReplyObject(tmp_reply);
tmp_reply=NULL;
if(nearest_rule_version!=version+1)
@@ -289,7 +322,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
for(i=0;i<reply->elements;i++)
{
assert(reply->element[i]->type==REDIS_REPLY_STRING);
ret=sscanf(reply->element[i]->str,"%[^:]:%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id));
ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id));
assert(ret==3);
if(strncmp(op_str,"ADD",strlen("ADD"))==0)
{
@@ -328,19 +361,7 @@ FULL_UPDATE:
reply=NULL;
return i;
}
int _wrap_redisGetReply(redisContext *c, redisReply **reply)
{
return redisGetReply(c, (void **)reply);
}
redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
{
va_list ap;
void *reply = NULL;
va_start(ap,format);
reply = redisvCommand(c,format,ap);
va_end(ap);
return (redisReply *)reply;
}
int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt)
{
int serial_num=0;
@@ -359,6 +380,9 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn
*new_region_cnt+=cmd->groups[i].region_num;
(*new_group_cnt)++;
}
//for MAAT_OP_DEL, if the group's refcnt>0, group->region_num=0.
//so it's OK to add.
serial_num+=cmd->groups[i].region_num;
}
return serial_num;
}
@@ -449,7 +473,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
set_serial_rule(list+rule_num,MAAT_OP_DEL,cmd->compile.config_id,feather->compile_tn,NULL);
}
rule_num++;
for(i=0;cmd->group_num;i++)
for(i=0;i<cmd->group_num;i++)
{
p_group=&(cmd->groups[i]);
if(op==MAAT_OP_ADD)
@@ -489,20 +513,19 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
rule_num++;
}
}
assert(rule_num<size);
assert(rule_num<=size);
return rule_num;
}
int mr_transaction_success(redisReply* data_reply)
{
unsigned int i=0;
for(i=0;i<data_reply->elements;i++)
if(data_reply->type==REDIS_REPLY_NIL)
{
if(data_reply->element[i]->type==REDIS_REPLY_NIL)
{
return 0;
}
return 0;
}
else
{
return 1;
}
return 1;
}
int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
{
@@ -512,6 +535,18 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
struct Maat_group_t* p_group=NULL;
struct Maat_region_t* p_region=NULL;
enum MAAT_TABLE_TYPE table_type;
struct _Maat_compile_inner_t *compile_rule=NULL;
if(feather->scanner!=NULL)
{
compile_rule=(struct _Maat_compile_inner_t*)HASH_fetch_by_id(feather->scanner->compile_hash, cmd->compile.config_id);
if(compile_rule!=NULL)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Maat rule %d already exisits.",cmd->compile.config_id);
return -1;
}
}
for(i=0;i<cmd->group_num;i++)
{
p_group=&(cmd->groups[i]);
@@ -529,7 +564,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
return -1;
}
table_type=type_region2table(p_region);
if(table_type!=feather->p_table_info[i]->table_type)
if(table_type!=feather->p_table_info[table_id]->table_type)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Table %s not support region type %d of Maat_command_t[%d]->group[%d]->region[%d]."
@@ -539,7 +574,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd)
return -1;
}
free((char*)p_region->table_name);
p_region->table_name=_maat_strdup(feather->p_table_info[i]->table_name[0]);
p_region->table_name=_maat_strdup(feather->p_table_info[table_id]->table_name[0]);
}
}
return 0;
@@ -671,7 +706,14 @@ struct Maat_command_t* Maat_create_comand(const struct Maat_rule_t* rule, int gr
memcpy(&(_cmd->cmd.compile),rule,sizeof(_cmd->cmd.compile));
_cmd->ref_cnt=1;
_cmd->cmd.group_num=group_num;
_cmd->cmd.groups=(struct Maat_group_t*)calloc(sizeof(struct Maat_group_t),group_num);
if(group_num>0)
{
_cmd->cmd.groups=(struct Maat_group_t*)calloc(sizeof(struct Maat_group_t),group_num);
}
else
{
_cmd->cmd.groups=NULL;
}
for(i=0;i<group_num;i++)
{
_cmd->cmd.groups[i].regions=(struct Maat_region_t*)calloc(sizeof(struct Maat_region_t),1);
@@ -795,13 +837,21 @@ int Maat_commit_command(Maat_feather_t feather)
long long maat_redis_version=0;
int new_region_num=0,new_group_num=0;
int serial_rule_num=0,serial_rule_idx=0;
int redis_transaction_failed=1;
struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
redisContext* ctx=NULL;
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
if(_feather->REDIS_MODE_ON==0)
{
return -1;
}
if(_feather->cmd_q_cnt==0)
{
return 0;
}
if(_feather->redis_write_ctx==NULL)
{
_feather->redis_write_ctx=redisConnectWithTimeout(_feather->redis_ip, _feather->redis_port,_feather->connect_timeout);
@@ -815,11 +865,10 @@ int Maat_commit_command(Maat_feather_t feather)
}
}
ctx=_feather->redis_write_ctx;
for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++)
{
p=p->next;
serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num);
p=p->next;
}
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num);
@@ -836,34 +885,36 @@ int Maat_commit_command(Maat_feather_t feather)
for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++)
{
p=p->next;
serial_rule_idx+=build_serial_rule(_feather,p,s_rule, serial_rule_num-serial_rule_idx);
p=p->next;
}
assert(serial_rule_idx==serial_rule_num);
while(1)
redis_transaction_failed=1;
while(redis_transaction_failed)
{
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
freeReplyObject(data_reply);
maat_redis_version=atoll(data_reply->str);
maat_redis_version=read_redis_integer(data_reply);
maat_redis_version++;
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(data_reply);
append_cmd_cnt=0;
for(i=0;i<serial_rule_num;i++)
{
if(s_rule[i].op==MAAT_OP_ADD)
{
redisAppendCommand(ctx,"SET %s:%s,%d \"%s\"",rm_key_prefix[MAAT_OP_ADD]
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name
,s_rule[i].rule_id
,s_rule[i].table_line);
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD NX ADD,%s,%d %d",s_rule[i].table_name
,s_rule[i].rule_id
,maat_redis_version);
redisAppendCommand(ctx,"ZADD %s NX %d ADD,%s,%d",rm_status_key
,maat_redis_version
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
}
else
@@ -884,9 +935,10 @@ int Maat_commit_command(Maat_feather_t feather)
,MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD NX DEL,%s,%d %d",s_rule[i].table_name
,s_rule[i].rule_id
,maat_redis_version);
redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_key
,maat_redis_version
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
}
}
@@ -894,20 +946,20 @@ int Maat_commit_command(Maat_feather_t feather)
append_cmd_cnt++;
redisAppendCommand(ctx,"EXEC");
append_cmd_cnt++;
redis_transaction_failed=0;
for(i=0;i<append_cmd_cnt;i++)
{
_wrap_redisGetReply(ctx, &data_reply);
if(1==mr_transaction_success(data_reply))
if(0==mr_transaction_success(data_reply))
{
freeReplyObject(data_reply);
break;
}
else
{
retry++;
assert(retry<5);
freeReplyObject(data_reply);
redis_transaction_failed=1;
}
freeReplyObject(data_reply);
}
if(redis_transaction_failed==1)
{
retry++;
assert(retry<5);
}
}
_feather->cmd_acc_num+=_feather->cmd_q_cnt;