diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 51bc977..cbeebed 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -1086,21 +1086,17 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule } return ret; } -//parameters: 3 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER, 2 args: TRANSACTION_VERSION SERVER_TIME +//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME const char* lua_exec_done= -"\ -local transaction_version=tonumber(ARGV[1])\n\ -local maat_version=redis.call(\'incrby\', KEYS[1], 1)\n\ -if(maat_version~=transaction_version) then\ - local affected=redis.call(\'zrangebyscore\', KEYS[2], transaction_version, transaction_version);\ - for k,v in pairs(affected) do\ - redis.call(\'zadd\', KEYS[2], maat_version, v);\ - end;\ -end;\ -redis.call(\'zadd\', KEYS[3], ARGV[2], maat_version);\ -return maat_version;\ -"; -redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt) +"local maat_version=redis.call(\'incrby\', KEYS[1], 1);" +"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);" +"for k,v in pairs(transaction) do" +" redis.call(\'zadd\', KEYS[2], maat_version, v);" +"end;" +"redis.call(\'del\', KEYS[4]);" +"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);" +"return maat_version;"; +redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt) { redisReply* data_reply=NULL; if(renew_allowed==1) @@ -1109,13 +1105,13 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version expect_reply[*cnt].srule_seq=-1; (*cnt)++; } - if(maat_redis_version>0) + if(strlen(transaction_list)>0) { - data_reply=_wrap_redisCommand(ctx, "eval %s 3 MAAT_VERSION %s %s %lld %lld", + data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld", lua_exec_done, mr_status_sset, mr_version_sset, - maat_redis_version, + transaction_list, server_time); freeReplyObject(data_reply); expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); @@ -1124,7 +1120,8 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version data_reply=_wrap_redisCommand(ctx,"EXEC"); return data_reply; } -void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed) + +void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed) { redisReply* data_reply=NULL; unsigned int append_cmd_cnt=0, i=0; @@ -1142,13 +1139,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_ (*cnt)++; append_cmd_cnt++; //Allowing add duplicated members for rule id recycling. - redisAppendCommand(ctx,"ZADD %s %lld ADD,%s,%d", - mr_status_sset, - version, + redisAppendCommand(ctx,"RPUSH %s ADD,%s,%d", + transaction_list, s_rule[i].table_name, s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); (*cnt)++; append_cmd_cnt++; if(s_rule[i].timeout>0) @@ -1201,13 +1196,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_ append_cmd_cnt++; //NX: Don't update already exisiting elements. Always add new elements. - redisAppendCommand(ctx,"ZADD %s %d DEL,%s,%d", - mr_status_sset, - version, + redisAppendCommand(ctx,"RPUSH %s DEL,%s,%d", + transaction_list, s_rule[i].table_name, s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); (*cnt)++; append_cmd_cnt++; @@ -1268,7 +1261,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in unsigned int multi_cmd_cnt=0, success_cnt=0; const int MAX_REDIS_OP_PER_SRULE=8; unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end() - + char transaction_list[MAX_TABLE_NAME_LEN*2]={0}; struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num); long long transaction_version=0, transaction_finished_version=0; int renew_num=0,ret=0; @@ -1286,14 +1279,18 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in success_cnt=-1; goto error_out; } + if(transaction_version>0) + { + snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version); + } while(success_cntelements==multi_cmd_cnt); @@ -1323,13 +1320,10 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in if(transaction_version>0) { transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]); - if(transaction_finished_version!=transaction_version) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command, - "Race condition of redis transaction (MAAT_PRE_VER != MAAT_VERSION) : %lld != %lld , need manually intervention when occurrent frequently.", - transaction_version, - transaction_finished_version); - } + MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command, + "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ", + transaction_version, + transaction_finished_version); } freeReplyObject(transaction_reply); @@ -1864,7 +1858,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx* m } } start(new_version,update_type,u_para); - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld->%lld (%d entries).", + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).", update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num); for(i=0;i