diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index db76312..8ebeee3 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -211,7 +211,8 @@ int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long lo if(s_rule->label_id>0) { redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset - ,s_rule->rule_id); + ,s_rule->rule_id); + append_cmd_cnt++; } return append_cmd_cnt; @@ -315,12 +316,12 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_ } int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** list,void* logger, unsigned int* new_version,int *update_type) { - redisReply* reply=NULL,*tmp_reply=NULL; + redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL; char err_buff[256]; char op_str[4]; long long version_in_redis=0,nearest_rule_version=0; int ret=0,retry=0; - unsigned int i=0; + unsigned int i=0,full_idx =0,append_cmd_cnt=0; struct serial_rule_t *s_rule=NULL; @@ -381,7 +382,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** if(nearest_rule_version!=version+1) { MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Noncontinuous VERSION Redis: %lld MAAT: %d.",tmp_reply->integer,version); + "Noncontinuous VERSION Redis: %lld MAAT: %d.",nearest_rule_version,version); goto FULL_UPDATE; } @@ -416,24 +417,82 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** FULL_UPDATE: MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Initiate full udpate from version %d to %lld.",version,version_in_redis); - reply=(redisReply*)redisCommand(c, "KEYS EFFECTIVE_RULE:*"); - assert(reply->type==REDIS_REPLY_ARRAY); - s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); - for(i=0;ielements;i++) + append_cmd_cnt=0; + ret=redisAppendCommand(c, "MULTI"); + append_cmd_cnt++; + ret=redisAppendCommand(c, "GET MAAT_VERSION"); + append_cmd_cnt++; + ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*"); + append_cmd_cnt++; + //consume reply "OK" and "QUEUED". + for(i=0;ielement[i]->type==REDIS_REPLY_STRING); - ret=sscanf(reply->element[i]->str,"%*[^:]:%[^,],%d",s_rule[i].table_name,&(s_rule[i].rule_id)); - s_rule[i].op=MAAT_OP_ADD; + _wrap_redisGetReply(c, &reply); + freeReplyObject(reply); + reply=NULL; + } + reply=_wrap_redisCommand(c,"EXEC"); + assert(reply->type==REDIS_REPLY_ARRAY); + *new_version=read_redis_integer(reply->element[0]); + sub_reply=reply->element[1]; + assert(sub_reply->type==REDIS_REPLY_ARRAY); + s_rule=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t)); + for(full_idx=0;full_idxelements;full_idx++) + { + assert(sub_reply->element[full_idx]->type==REDIS_REPLY_STRING); + ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule[full_idx].table_name,&(s_rule[full_idx].rule_id)); + s_rule[full_idx].op=MAAT_OP_ADD; assert(ret==2); } + freeReplyObject(reply); *list=s_rule; *update_type=CM_UPDATE_TYPE_FULL; - - freeReplyObject(reply); - reply=NULL; - return i; + + return full_idx ; +} +void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger) +{ + int i=0,ret=0,failed_cnt=0,idx=0; + int *retry_ids=(int*)malloc(sizeof(int)*rule_num); + char redis_cmd[256]; + redisReply* reply=NULL; + for(i=0;itype==REDIS_REPLY_STRING) + { + rule_list[i].table_line=_maat_strdup(reply->str); + } + else + { + assert(reply->type==REDIS_REPLY_NIL); + retry_ids[failed_cnt]=i; + failed_cnt++; + } + freeReplyObject(reply); + } + for(i=0;itype==REDIS_REPLY_STRING); + rule_list[idx].table_line=_maat_strdup(reply->str); + freeReplyObject(reply); + } + free(retry_ids); + return; } - int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt) { int serial_num=0; @@ -471,10 +530,16 @@ int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _ void* logger=feather->logger; int config_id=cmd->compile.config_id; + if(feather->scanner==NULL) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command + ,"MAAT not ready."); + return -1; + } compile_inner=(struct _Maat_compile_inner_t *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id); if(compile_inner==NULL) { - MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_command + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command ,"config %d not exist." ,config_id); return -1; @@ -779,11 +844,9 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, const unsigned char* dec_key, _Maat_feather_t* feather) { - redisReply* data_reply=NULL; unsigned int rule_num=0,i=0; int table_id=0; int ret=0; - char redis_cmd[256]; struct serial_rule_t* rule_list=NULL; int update_type=0; unsigned int new_version=0; @@ -799,19 +862,11 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, { return; } - for(i=0;imap_tablename2id,rule_list[i].table_name,&table_id); @@ -820,13 +875,15 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, continue; } table_type=feather->p_table_info[table_id]->table_type; - invalidate_line(data_reply->str,table_type,feather->p_table_info[table_id]->valid_flag_column); + invalidate_line(rule_list[i].table_line,table_type,feather->p_table_info[table_id]->valid_flag_column); } - update(rule_list[i].table_name,data_reply->str,u_para); - freeReplyObject(data_reply); + update(rule_list[i].table_name,rule_list[i].table_line,u_para); } finish(u_para); - // no need to calll empty_serial_rules + for(i=0;iredis_write_ctx,&s_rule, 1); retry++; - assert(retry<5); + } + if(retry>10) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module + ,"Command set line id %d success after retry %d times." + , line_rule->rule_id + ); } return 0; } diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 5e23ae6..2fb4bc2 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -28,7 +28,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_0_20170701=1; +int MAAT_FRAME_VERSION_2_0_20170807=1; const char *maat_module="MAAT Frame"; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",