diff --git a/src/entry/Maat_api.cpp b/src/entry/Maat_api.cpp index f2dcc17..c483ee2 100644 --- a/src/entry/Maat_api.cpp +++ b/src/entry/Maat_api.cpp @@ -599,7 +599,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo _feather->redis_index=*((int*)value); break; case MAAT_OPT_CMD_AUTO_NUMBERING: - if((size_t)size!=sizeof(int)) + if((size_t)size!=sizeof(int)||*((int*)value)>15||*((int*)value)<0) { return -1; } @@ -613,6 +613,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo int Maat_initiate_feather(Maat_feather_t feather) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + redisReply* reply=NULL; if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0) { @@ -630,6 +631,8 @@ int Maat_initiate_feather(Maat_feather_t feather) return -1; } redisEnableKeepAlive(_feather->redis_read_ctx); + reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index); + freeReplyObject(reply); redis_monitor_traverse(_feather->maat_version ,_feather->redis_read_ctx ,maat_start_cb diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 7e869da..190a36a 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -14,9 +14,10 @@ const char* maat_redis_command="MAAT_REDIS_COMMAND"; const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; const char* rm_status_sset="MAAT_UPDATE_STATUS"; -const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; +const char* rm_expire_sset="MAAT_RULE_TIMER"; const char* rm_label_sset="MAAT_LABEL_INDEX"; -const int MAAT_REDIS_SYNC_TIME=30*60; +const char* rm_version_sset="MAAT_VERSION_TIMER"; +const static int MAAT_REDIS_SYNC_TIME=30*60; struct serial_rule_t //rm= Redis Maat { @@ -52,6 +53,7 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) int connect_redis_for_write(_Maat_feather_t * feather) { int ret=0; + redisReply* reply=NULL; assert(feather->redis_write_ctx==NULL); feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout); if(feather->redis_write_ctx==NULL) @@ -61,6 +63,11 @@ int connect_redis_for_write(_Maat_feather_t * feather) ,feather->redis_ip,feather->redis_port); ret=-1; } + else + { + reply=_wrap_redisCommand(feather->redis_read_ctx, "select %d",feather->redis_index); + freeReplyObject(reply); + } return ret; } long long read_redis_integer(const redisReply* reply) @@ -386,11 +393,9 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** goto FULL_UPDATE; } - else - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Inc Update form version %d to %lld.",version,version_in_redis); - } + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Inc Update form version %d to %lld.",version,version_in_redis); + s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); for(i=0;ielements;i++) { @@ -675,7 +680,7 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } -int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num) +int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time) { int append_cmd_cnt=0,i=0; long long maat_redis_version=0; @@ -729,6 +734,8 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r } redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1"); append_cmd_cnt++; + redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version); + append_cmd_cnt++; redisAppendCommand(ctx,"EXEC"); append_cmd_cnt++; redis_transaction_success=1; @@ -803,9 +810,9 @@ void check_maat_expiration(redisContext *ctx, void *logger) redisReply* data_reply=NULL; struct serial_rule_t* s_rule=NULL; long long server_time=0; - data_reply=_wrap_redisCommand(ctx, "TIME"); - server_time=data_reply->element[0]->integer; - freeReplyObject(data_reply); + + server_time=redis_server_time(ctx); + data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time); if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0) { @@ -821,21 +828,69 @@ void check_maat_expiration(redisContext *ctx, void *logger) assert(ret==2); } freeReplyObject(data_reply); - is_success=exec_serial_rule(ctx,s_rule, s_rule_num); + is_success=exec_serial_rule(ctx,s_rule, s_rule_num,server_time); if(is_success==1) { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor ,"Succesfully expried %d rules in Redis.", s_rule_num); } else { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor ,"Failed to expried %d rules in Redis.", s_rule_num); } + free(s_rule); return; } +void cleanup_update_status(redisContext *ctx, void *logger) +{ + redisReply* reply=NULL,*sub_reply=NULL; + int append_cmd_cnt=0,i=0; + long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0; + + server_time=redis_server_time(ctx); + + reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(reply); + redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME); + append_cmd_cnt++; + redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME); + append_cmd_cnt++; + //consume reply "OK" and "QUEUED". + for(i=0;itype==REDIS_REPLY_ARRAY); + sub_reply=reply->element[0]; + assert(sub_reply->type==REDIS_REPLY_ARRAY); + version_num=sub_reply->elements; + if(version_num==0) + { + freeReplyObject(reply); + return; + } + version_lower_bound=read_redis_integer(sub_reply->element[0]); + version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]); + freeReplyObject(reply); + + reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s -inf %lld",rm_expire_sset,version_upper_bound); + entry_num=read_redis_integer(reply); + freeReplyObject(reply); + + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor + ,"Clean up updaste status from version %lld to %lld (%lld versions, %lld entries)." + ,version_lower_bound + ,version_upper_bound + ,version_num + ,entry_num); + +} void redis_monitor_traverse(unsigned int version,redisContext *c, void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para @@ -856,6 +911,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, { //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. check_maat_expiration(feather->redis_read_ctx, logger); + cleanup_update_status(feather->redis_read_ctx, logger); } rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type); if(rule_num==0) @@ -1020,7 +1076,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule ret=0; while(!ret) { - ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1); + ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,_feather->server_time); retry++; } if(retry>10) @@ -1189,7 +1245,7 @@ int Maat_cmd_commit(Maat_feather_t feather) transection_success=0; while(!transection_success) { - transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num); + transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time); if(transection_success!=1) { retry++; diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index af2e317..d8eff56 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -28,7 +28,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_0_20170809=1; +int MAAT_FRAME_VERSION_2_0_20170810=1; const char *maat_module="MAAT Frame"; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", diff --git a/src/entry/Maat_rule_internal.h b/src/entry/Maat_rule_internal.h index dbd3f4f..7ae040c 100644 --- a/src/entry/Maat_rule_internal.h +++ b/src/entry/Maat_rule_internal.h @@ -452,6 +452,7 @@ void maat_stat_table(struct _Maat_table_info_t* p_table,int scan_len,struct time void maat_stat_output(struct _Maat_feather_t* feather); char* _maat_strdup(const char* s); char* str_unescape(char* s); +redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...); void redis_monitor_traverse(unsigned int version,redisContext *c, void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para