diff --git a/inc/Maat_command.h b/inc/Maat_command.h index ed06262..fbdd9a1 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -7,7 +7,8 @@ enum MAAT_OPERATION { MAAT_OP_DEL=0, - MAAT_OP_ADD + MAAT_OP_ADD, + MAAT_OP_RENEW_TIMEOUT //Rule expire time is changed to now+cmd->expire_after }; enum MAAT_REGION_TYPE @@ -137,15 +138,22 @@ char* Maat_str_escape(char* dst,int size,const char*src); //Deletion failed due to not complete synchronize with Redis. //To make sure the delete command is excecuted, user should try again after MAAT_OPT_SCANDIR_INTERVAL_MS ms. -// The following functions are NOT thread safe. +//Returns nubmer of successfully updated rule. +//The following functions are NOT thread safe. int Maat_cmd(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op); //pipeline model int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op); + +//Return nubmer of successfully updated rule. +//Return -1 for failed. int Maat_cmd_commit(Maat_feather_t feather); int Maat_cmd_set_group(Maat_feather_t feather, int group_id, const struct Maat_region_t* region, enum MAAT_OPERATION op); + +//Returns nubmer of successfully updated rule. +//Return -1 for failed. int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule, enum MAAT_OPERATION op); int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_rule, int line_num ,enum MAAT_OPERATION op); //Return the value of key after the increment. diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index a3d81b0..28aeba3 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR")) #define maat_command (module_name_str("MAAT_COMMAND")) @@ -18,8 +19,9 @@ const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; const char* rm_label_sset="MAAT_LABEL_INDEX"; const char* rm_version_sset="MAAT_VERSION_TIMER"; const char* rm_expire_lock="EXPIRE_OP_LOCK"; +const long rm_expire_lock_timeout=300*1000; const static int MAAT_REDIS_SYNC_TIME=30*60; - +const char* rm_op_str[]={"DEL","ADD","RENEW_TIMEOUT"}; struct _Maat_cmd_inner_t @@ -532,11 +534,11 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir do{ target_version++; rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger); - if(ret>0) + if(rule_num>0) { break; } - else if(ret<0) + else if(rule_num<0) { goto FULL_UPDATE; } @@ -862,7 +864,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL,cmd->compile.config_id,cmd->label_id,feather->compile_tn,NULL,timeout); + set_serial_rule(list+rule_num,op,cmd->compile.config_id,cmd->label_id,feather->compile_tn,NULL,timeout); } rule_num++; for(i=0;igroup_num;i++) @@ -881,7 +883,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL,0); + set_serial_rule(list+rule_num,op,p_group->group_id,0,feather->group_tn,NULL,timeout); } rule_num++; if(p_group->regions==NULL)//group reuse. @@ -904,8 +906,8 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL - ,p_region->region_id,0,p_region->table_name,NULL,0); + set_serial_rule(list+rule_num,op + ,p_region->region_id,0,p_region->table_name,NULL,timeout); } rule_num++; @@ -925,118 +927,215 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } -int mr_operation_success(redisReply* data_reply) +int mr_operation_success(redisReply* actual_reply, redisReply* expected_reply) { - if(data_reply->type==REDIS_REPLY_INTEGER&&data_reply->integer==0) + if(expected_reply->type!=actual_reply->type) + { + return 0; + } + if(expected_reply->type==REDIS_REPLY_INTEGER&&expected_reply->integer!=actual_reply->integer) { return 0; } return 1; -} -long long _exec_serial_rule_begin(redisContext* ctx) -{ - long long maat_redis_version=0; - redisReply* data_reply=NULL; - data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1"); - maat_redis_version=read_redis_integer(data_reply); - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx,"MULTI"); - return maat_redis_version; } -redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time) +int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire) +{ + redisReply* reply=NULL; + int ret=0; + reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire); + if(reply->type==REDIS_REPLY_NIL) + { + ret=0; + } + else + { + ret=1; + } + freeReplyObject(reply); + return ret; +} +void redlock_unlock(redisContext * ctx, const char * lock_name) +{ + redisReply* reply=NULL; + reply=_wrap_redisCommand(ctx,"DEL %s", lock_name); + freeReplyObject(reply); + +} +struct expected_reply_t +{ + int srule_seq; + redisReply reply; +}; + +long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version) +{ + int ret=0; + redisReply* data_reply=NULL; + if(renew_rule_num>0) + { + while(0==redlock_try_lock(ctx, rm_expire_lock, rm_expire_lock_timeout)) + { + usleep(1000); + } + *renew_allowed=1; + } + if(rule_num>renew_rule_num) + { + data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1"); + *maat_redis_version=read_redis_integer(data_reply); + freeReplyObject(data_reply); + } + if(*renew_allowed==1||rule_num>renew_rule_num) + { + data_reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(data_reply); + ret=1; + } + return ret; +} +redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed) { redisReply* data_reply=NULL; - - data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version); - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1"); - freeReplyObject(data_reply); + if(renew_allowed==1) + { + redlock_unlock(ctx, rm_expire_lock); + } + if(maat_redis_version>0) + { + data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version); + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1"); + freeReplyObject(data_reply); + } data_reply=_wrap_redisCommand(ctx,"EXEC"); return data_reply; } -void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, int* multi_cmd_seq, unsigned int *cnt, int offset) +void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, struct expected_reply_t* expect_reply, unsigned int *cnt, int offset,int renew_allowed) { int i=0; redisReply* data_reply=NULL; int append_cmd_cnt=0; for(i=0;i0) - { - redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset + case MAAT_OP_ADD: + redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD] + ,s_rule[i].table_name + ,s_rule[i].rule_id + ,s_rule[i].table_line); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS; + (*cnt)++; + append_cmd_cnt++; + //NX: Don't update already exisiting elements. Always add new elements. + redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset + ,version + ,s_rule[i].table_name + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=1; + (*cnt)++; + append_cmd_cnt++; + if(s_rule[i].timeout>0) + { + redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset + ,s_rule[i].timeout + ,s_rule[i].table_name + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=1; + (*cnt)++; + append_cmd_cnt++; + } + if(s_rule[i].label_id>0) + { + redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset + ,s_rule[i].label_id + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=1; + (*cnt)++; + + append_cmd_cnt++; + } + break; + case MAAT_OP_DEL: + redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d" + ,rm_key_prefix[MAAT_OP_ADD] + ,s_rule[i].table_name + ,s_rule[i].rule_id + ,rm_key_prefix[MAAT_OP_DEL] + ,s_rule[i].table_name + ,s_rule[i].rule_id + ); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS; + (*cnt)++; + append_cmd_cnt++; + + redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL] + ,s_rule[i].table_name + ,s_rule[i].rule_id + ,MAAT_REDIS_SYNC_TIME); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=1; + (*cnt)++; + append_cmd_cnt++; + + //NX: Don't update already exisiting elements. Always add new elements. + redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset + ,version + ,s_rule[i].table_name + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=1; + (*cnt)++; + append_cmd_cnt++; + + // Try to remove from expiration sorted set, no matter wheather it exists or not. + redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset + ,s_rule[i].table_name + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=-1; + (*cnt)++; + append_cmd_cnt++; + + redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset + ,s_rule[i].rule_id); + expect_reply[*cnt].srule_seq=-1; + (*cnt)++; + append_cmd_cnt++; + break; + case MAAT_OP_RENEW_TIMEOUT: + if(renew_allowed!=1) + { + continue; + } + //s_rule[i].timeout>0 was checked by caller. + //XX: Only update elements that already exist. Never add elements. + redisAppendCommand(ctx,"ZADD %s XX %lld %s,%d",rm_expire_sset ,s_rule[i].timeout ,s_rule[i].table_name ,s_rule[i].rule_id); - multi_cmd_seq[(*cnt)++]=i+offset; + expect_reply[*cnt].srule_seq=i+offset; + expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; + expect_reply[*cnt].reply.integer=0; + (*cnt)++; append_cmd_cnt++; - } - if(s_rule[i].label_id>0) - { - redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset - ,s_rule[i].label_id - ,s_rule[i].rule_id); - multi_cmd_seq[(*cnt)++]=i+offset; - append_cmd_cnt++; - } + + break; + default: + assert(0); + break; } - else - { - redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d" - ,rm_key_prefix[MAAT_OP_ADD] - ,s_rule[i].table_name - ,s_rule[i].rule_id - ,rm_key_prefix[MAAT_OP_DEL] - ,s_rule[i].table_name - ,s_rule[i].rule_id - ); - multi_cmd_seq[(*cnt)++]=i+offset; - append_cmd_cnt++; - - redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL] - ,s_rule[i].table_name - ,s_rule[i].rule_id - ,MAAT_REDIS_SYNC_TIME); - multi_cmd_seq[(*cnt)++]=i+offset; - append_cmd_cnt++; - - //NX: Don't update already exisiting elements. Always add new elements. - redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset - ,version - ,s_rule[i].table_name - ,s_rule[i].rule_id); - multi_cmd_seq[(*cnt)++]=i+offset; - append_cmd_cnt++; - - // Try to remove from expiration sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset - ,s_rule[i].table_name - ,s_rule[i].rule_id); - multi_cmd_seq[(*cnt)++]=-1; - append_cmd_cnt++; - - redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset - ,s_rule[i].rule_id); - multi_cmd_seq[(*cnt)++]=-1; - append_cmd_cnt++; - - } - } for(i=0;ielements==multi_cmd_cnt+2); - for(i=0;ielements;i++) + assert(transaction_reply->elements-2<=multi_cmd_cnt); + for(i=0;ielement[i]; - j=multi_cmd_seq[i]; - if(j!=-1&&0==mr_operation_success(p)) - { - assert(j0&&renew_allowed!=1) + { + for(i=0;i<(unsigned int)serial_rule_num;i++) + { + if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT) + { + MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command + ,"%s %s %d is not allowed due to lock contention.",rm_op_str[MAAT_OP_RENEW_TIMEOUT] + , s_rule[i].table_name,s_rule[i].rule_id); + } + } + if(success_cnt>0) + { + success_cnt-=renew_num; + } + } + free(expected_reply); return success_cnt; } @@ -1236,29 +1371,7 @@ void cleanup_update_status(redisContext *ctx, void *logger) ,entry_num); } -int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire) -{ - redisReply* reply=NULL; - int ret=0; - reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire); - if(reply->type==REDIS_REPLY_NIL) - { - ret=0; - } - else - { - ret=1; - } - freeReplyObject(reply); - return ret; -} -void redlock_unlock(redisContext * ctx, const char * lock_name) -{ - redisReply* reply=NULL; - reply=_wrap_redisCommand(ctx,"DEL %s", lock_name); - freeReplyObject(reply); -} void redis_monitor_traverse(long long version,redisContext *c, void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para @@ -1277,7 +1390,7 @@ void redis_monitor_traverse(long long version,redisContext *c, if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write { //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. - if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, 300*1000)) + if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, rm_expire_lock_timeout)) { check_maat_expiration(feather->redis_read_ctx, logger); cleanup_update_status(feather->redis_read_ctx, logger); @@ -1321,7 +1434,7 @@ void redis_monitor_traverse(long long version,redisContext *c, } } start(new_version,update_type,u_para); - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld->%lld (%d entries)\n", + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld->%lld (%d entries).", update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num); for(i=0;iredis_write_ctx==NULL) @@ -1539,34 +1652,21 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru goto error_out; } } + if(op==MAAT_OP_RENEW_TIMEOUT) + { + assert(line_rule[i]->expire_after>0); + } if(line_rule[i]->expire_after>0) { absolute_expire_time=server_time+line_rule[i]->expire_after; } set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); } - ret=0; - while(success_cntredis_write_ctx,s_rule, line_num,server_time,_feather->logger); - if(success_cnt<0)//transaction failed - { - retry++; - } - else - { - break; - } - - } + success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger); + assert(success_cnt==line_num); + ret=success_cnt; _feather->line_cmd_acc_num+=success_cnt; - if(retry>5) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command - ,"Command set line id %d success after retry %d times." - , line_rule[0]->rule_id, retry - ); - } + error_out: for(i=0;iop=op; - assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD); + assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD||op==MAAT_OP_RENEW_TIMEOUT); assert(_cmd->next==NULL); - if(op==MAAT_OP_DEL) + if(op==MAAT_OP_RENEW_TIMEOUT) { - ret=reconstruct_cmd(_feather, _cmd); + assert(cmd->expire_after>0); } - else + switch(op) { - ret=fix_table_name(_feather, cmd); + case MAAT_OP_DEL: + case MAAT_OP_RENEW_TIMEOUT: + ret=reconstruct_cmd(_feather, _cmd); + break; + case MAAT_OP_ADD: + ret=fix_table_name(_feather, cmd); + break; } if(ret<0) { @@ -1688,7 +1794,7 @@ int Maat_cmd_commit(Maat_feather_t feather) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - int ret=0,i=0,retry=0; + int ret=0,i=0; int new_region_num=0,new_group_num=0; int serial_rule_num=0,serial_rule_idx=0; int transection_success=1; @@ -1711,6 +1817,7 @@ int Maat_cmd_commit(Maat_feather_t feather) ret=connect_redis_for_write(_feather); if(ret!=0) { + ret=-1; goto error_out; } } @@ -1743,23 +1850,9 @@ int Maat_cmd_commit(Maat_feather_t feather) } assert(serial_rule_idx==serial_rule_num); transection_success=0; - while(transection_successserver_time,_feather->logger); - if(transection_success==-1) - { - retry++; - } - else - { - break; - } - } - if(retry>5) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command - ,"MAAT retry for %d times.", retry); - } + transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger); + assert(transection_success==serial_rule_num); + ret=_feather->cmd_q_cnt; _feather->cmd_acc_num+=_feather->cmd_q_cnt; error_out: p=_feather->cmd_qhead; diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index a74a717..002f013 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -30,7 +30,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_2_20180727=1; +int MAAT_FRAME_VERSION_2_2_20180801=1; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; diff --git a/test/maat_test.cpp b/test/maat_test.cpp index 059451b..28d76f0 100644 --- a/test/maat_test.cpp +++ b/test/maat_test.cpp @@ -606,7 +606,7 @@ void test_set_cmd_line(Maat_feather_t feather) ret=Maat_cmd_set_lines(feather, p_line,TEST_CMD_LINE_NUM, MAAT_OP_ADD); - assert(ret==0); + assert(ret>0); usleep(WAIT_FOR_EFFECTIVE_US); for(i=0;i0); return; @@ -680,7 +680,7 @@ int test_add_ip_command(Maat_feather_t feather,const char* region_table) { struct Maat_cmd_t* cmd=NULL; struct Maat_rule_t rule; - int config_id=0,timeout=2; + int config_id=0,timeout=4; long long version_before=0,version_after=0; struct Maat_region_t region; @@ -722,8 +722,9 @@ int test_add_ip_command(Maat_feather_t feather,const char* region_table) return 0; } Maat_free_cmd(cmd); - //TEST if the command go into effective. + cmd=NULL; + //TEST if the command go into effective. usleep(WAIT_FOR_EFFECTIVE_US); //waiting for commands go into effect ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_after, sizeof(version_after)); @@ -759,22 +760,29 @@ int test_add_ip_command(Maat_feather_t feather,const char* region_table) } } Maat_clean_status(&mid); - - usleep(timeout*1000*1000+WAIT_FOR_EFFECTIVE_US);//wait for commands expired. + //reset timeout. + cmd=Maat_create_cmd(&rule, 0); + cmd->expire_after=10; + ret=Maat_cmd(feather, cmd, MAAT_OP_RENEW_TIMEOUT); + assert(ret==1); + usleep(2*1000*1000+WAIT_FOR_EFFECTIVE_US);//wait for commands expired. + Maat_free_cmd(cmd); + cmd=NULL; ret=Maat_scan_proto_addr(feather,table_id,&ipv4_addr,6,&result,1, &mid,0); if(ret==0) { - printf("Test expire IP rule Success.\n"); + printf("Test RENEW timeout Failed.\n"); } else { - printf("Test expire IP rule Failed.\n"); + printf("Test RENEW timeout Success.\n"); } Maat_clean_status(&mid); return 0; } + int del_command(Maat_feather_t feather,int config_id) { struct Maat_cmd_t* cmd=NULL;