diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 5c96059..83be9cc 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -406,15 +406,17 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis } if(version_in_redis>version&&cumulative_off==1) { - target_version=version+1; + target_version=version; } else { - target_version=version_in_redis; + target_version=version_in_redis-1; } do{ - //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. - //The elements are considered to be ordered from low to high scores(version). + + target_version++; + //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. + //The elements are considered to be ordered from low to high scores(version). reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,target_version); if(reply==NULL) @@ -431,19 +433,19 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis //a duplicate rule_id would induce this error. freeReplyObject(reply); } - target_version++; + }while(rule_num==0&&target_version<=version_in_redis&&cumulative_off==1); if(rule_num==0) { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative=%d" - ,rm_status_sset,version,target_version-1,!cumulative_off); + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s" + ,rm_status_sset,version,target_version-1,cumulative_off==1?"OFF":"ON"); goto FULL_UPDATE; } tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); if(tmp_reply->type!=REDIS_REPLY_STRING) { MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, version_in_redis); + "ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, target_version); free(tmp_reply); free(reply); goto FULL_UPDATE; @@ -460,7 +462,7 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis goto FULL_UPDATE; } MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Inc Update form version %lld to %lld (%lld entries).",version,version_in_redis,reply->elements); + "Inc Update form version %lld to %lld (%lld entries).",version,target_version,reply->elements); s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); for(i=0;ielements;i++) @@ -484,7 +486,7 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis *list=s_rule; *update_type=CM_UPDATE_TYPE_INC; freeReplyObject(reply); - *new_version=version_in_redis; + *new_version=target_version; return i; FULL_UPDATE: MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, @@ -826,13 +828,10 @@ int mr_operation_success(redisReply* data_reply) return 1; } -#define REDIS_OP_PER_SRULE 8 -int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger) +long long _exec_serial_rule_begin(redisContext* ctx) { - int i=0,j=0,ret=0; long long maat_redis_version=0; redisReply* data_reply=NULL; - int redis_transaction_success=1; data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); @@ -840,12 +839,25 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ maat_redis_version++; freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx,"MULTI"); + return maat_redis_version; +} +redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time) +{ + redisReply* data_reply=NULL; + + data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version); freeReplyObject(data_reply); - int max_append_cnt=serial_rule_num*REDIS_OP_PER_SRULE+4; + data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1"); + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx,"EXEC"); + return data_reply; +} +void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, int* multi_cmd_seq, unsigned int *cnt, int offset) +{ + int i=0,j=0,ret=0; + redisReply* data_reply=NULL; int append_cmd_cnt=0; - int *pipeline_seq=(int*)calloc(sizeof(int),max_append_cnt); - assert(server_time>0); - for(i=0;i0) { @@ -868,7 +880,7 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ ,s_rule[i].timeout ,s_rule[i].table_name ,s_rule[i].rule_id); - pipeline_seq[append_cmd_cnt]=i; + multi_cmd_seq[(*cnt)++]=i+offset; append_cmd_cnt++; } if(s_rule[i].label_id>0) @@ -876,65 +888,74 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset ,s_rule[i].label_id ,s_rule[i].rule_id); - pipeline_seq[append_cmd_cnt]=i; + multi_cmd_seq[(*cnt)++]=i+offset; append_cmd_cnt++; } } else { - ret=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); + ret=del_rule_from_redis(ctx,s_rule+i,version); for(j=0;jelements==multi_cmd_cnt+2); + for(i=0;ielements;i++) { - success_cnt+=batch_cnt; - failed_batch=0; - } - else - { - failed_batch++; + p=transaction_reply->element[i]; + if(0==mr_operation_success(p)) + { + j=multi_cmd_seq[i]; + assert(jrule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); } ret=0; - while(success_cntredis_write_ctx,s_rule+success_cnt, line_num-success_cnt,server_time,_feather->logger); - retry++; + success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger); + if(success_cnt<0)//transaction failed + { + retry++; + } + else + { + break; + } + } - if(retry>10) + if(retry>5) { MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command ,"Command set line id %d success after retry %d times." @@ -1507,13 +1536,17 @@ int Maat_cmd_commit(Maat_feather_t feather) } assert(serial_rule_idx==serial_rule_num); transection_success=0; - while(transection_successserver_time,_feather->logger); - if(transection_successserver_time,_feather->logger); + if(transection_success==-1) { retry++; } + else + { + break; + } } if(retry>5) { @@ -1594,9 +1627,16 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger) data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); - maat_redis_version=read_redis_integer(data_reply); - maat_redis_version++; - freeReplyObject(data_reply); + if(data_reply->type==REDIS_REPLY_NIL) + { + maat_redis_version=0; + } + else + { + maat_redis_version=read_redis_integer(data_reply); + maat_redis_version++; + freeReplyObject(data_reply); + } data_reply=_wrap_redisCommand(ctx, "DBSIZE"); dbsize=read_redis_integer(data_reply); freeReplyObject(data_reply); @@ -1626,7 +1666,7 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger) { MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command ,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu." - ,db_index, maat_redis_version-1,dbsize + ,db_index, (maat_redis_version==0)?0:(maat_redis_version-1),dbsize ); } return redis_transaction_success; diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 43c6c95..52b3ce4 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -30,7 +30,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_1_20180316=1; +int MAAT_FRAME_VERSION_2_1_20180322=1; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; diff --git a/tools/maat_redis_tool.cpp b/tools/maat_redis_tool.cpp index f5fdbcc..797aa36 100644 --- a/tools/maat_redis_tool.cpp +++ b/tools/maat_redis_tool.cpp @@ -19,6 +19,7 @@ void maat_tool_print_usage(void) printf("Usage:\n"); printf("\t-h [host], redis IP, 127.0.0.1 as default.\n"); printf("\t-p [port], redis port, 6379 as default.\n"); + printf("\t-n [db], redis db, 0 as default.\n"); printf("\t-d [dir], dump rules from redis to [dir], %s as default.\n",redis_dump_dir); printf("\t-j [payload.json], add or delete rules as maat json. Must have field compile_table field, and plugin table's valid flag must be in the last column.\n"); printf("\t-t [timeout], timeout config after t seconds, default 0, not timeout.\n"); @@ -41,6 +42,7 @@ static redisContext * connect_redis(const char*redis_ip, int redis_port, int red struct timeval connect_timeout; connect_timeout.tv_sec=0; connect_timeout.tv_usec=100*1000; // 100 ms + redisReply* reply=NULL; redisContext * ctx; ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); @@ -49,6 +51,9 @@ static redisContext * connect_redis(const char*redis_ip, int redis_port, int red printf("Unable to connect %s:%d db%d\n",redis_ip,redis_port,redis_db); return NULL; } + reply=_wrap_redisCommand(ctx, "select %d",redis_db); + freeReplyObject(reply); + return ctx; } @@ -223,7 +228,7 @@ int main(int argc, char * argv[]) unsigned long json_file_size=0,read_size=0; char* json_buff=NULL; - while((oc=getopt(argc,argv,"h:p:d:f:j:t:"))!=-1) + while((oc=getopt(argc,argv,"h:p:n:d:f:j:t:"))!=-1) { switch(oc) { @@ -233,6 +238,9 @@ int main(int argc, char * argv[]) case 'p': sscanf(optarg,"%d",&redis_port); break; + case 'n': + sscanf(optarg,"%d",&redis_db); + break; case 'd': model=WORK_MODE_DUMP; strncpy(dump_dir,optarg,sizeof(dump_dir)); @@ -321,9 +329,13 @@ int main(int argc, char * argv[]) config_monitor_traverse(0, tmp_iris_path, NULL, make_serial_rule, NULL, s_rule,NULL, NULL); printf("Timeout=%lld\n",absolute_expire_time); ret=0; - while(success_cnt