为了解决事物结束时,transaction version<maat_version的问题,使用redis list MAAT_TRANSACTION_xx存储配置更新状态,事务结束时再用lua script同步MAAT_UPDATE_STATUS。

This commit is contained in:
zhengchao
2019-02-22 18:42:31 +06:00
parent 31e2cacd7b
commit 2a8ced2f7d
2 changed files with 33 additions and 41 deletions

View File

@@ -1086,21 +1086,17 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
} }
return ret; return ret;
} }
//parameters: 3 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER, 2 args: TRANSACTION_VERSION SERVER_TIME //parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME
const char* lua_exec_done= const char* lua_exec_done=
"\ "local maat_version=redis.call(\'incrby\', KEYS[1], 1);"
local transaction_version=tonumber(ARGV[1])\n\ "local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);"
local maat_version=redis.call(\'incrby\', KEYS[1], 1)\n\ "for k,v in pairs(transaction) do"
if(maat_version~=transaction_version) then\ " redis.call(\'zadd\', KEYS[2], maat_version, v);"
local affected=redis.call(\'zrangebyscore\', KEYS[2], transaction_version, transaction_version);\ "end;"
for k,v in pairs(affected) do\ "redis.call(\'del\', KEYS[4]);"
redis.call(\'zadd\', KEYS[2], maat_version, v);\ "redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
end;\ "return maat_version;";
end;\ redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
redis.call(\'zadd\', KEYS[3], ARGV[2], maat_version);\
return maat_version;\
";
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
{ {
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
if(renew_allowed==1) if(renew_allowed==1)
@@ -1109,13 +1105,13 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version
expect_reply[*cnt].srule_seq=-1; expect_reply[*cnt].srule_seq=-1;
(*cnt)++; (*cnt)++;
} }
if(maat_redis_version>0) if(strlen(transaction_list)>0)
{ {
data_reply=_wrap_redisCommand(ctx, "eval %s 3 MAAT_VERSION %s %s %lld %lld", data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld",
lua_exec_done, lua_exec_done,
mr_status_sset, mr_status_sset,
mr_version_sset, mr_version_sset,
maat_redis_version, transaction_list,
server_time); server_time);
freeReplyObject(data_reply); freeReplyObject(data_reply);
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
@@ -1124,7 +1120,8 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version
data_reply=_wrap_redisCommand(ctx,"EXEC"); data_reply=_wrap_redisCommand(ctx,"EXEC");
return data_reply; return data_reply;
} }
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
{ {
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
unsigned int append_cmd_cnt=0, i=0; unsigned int append_cmd_cnt=0, i=0;
@@ -1142,13 +1139,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
//Allowing add duplicated members for rule id recycling. //Allowing add duplicated members for rule id recycling.
redisAppendCommand(ctx,"ZADD %s %lld ADD,%s,%d", redisAppendCommand(ctx,"RPUSH %s ADD,%s,%d",
mr_status_sset, transaction_list,
version,
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id); s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
if(s_rule[i].timeout>0) if(s_rule[i].timeout>0)
@@ -1201,13 +1196,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
append_cmd_cnt++; append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements. //NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s %d DEL,%s,%d", redisAppendCommand(ctx,"RPUSH %s DEL,%s,%d",
mr_status_sset, transaction_list,
version,
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id); s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
@@ -1268,7 +1261,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
unsigned int multi_cmd_cnt=0, success_cnt=0; unsigned int multi_cmd_cnt=0, success_cnt=0;
const int MAX_REDIS_OP_PER_SRULE=8; const int MAX_REDIS_OP_PER_SRULE=8;
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end() unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
char transaction_list[MAX_TABLE_NAME_LEN*2]={0};
struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num); struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
long long transaction_version=0, transaction_finished_version=0; long long transaction_version=0, transaction_finished_version=0;
int renew_num=0,ret=0; int renew_num=0,ret=0;
@@ -1286,14 +1279,18 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
success_cnt=-1; success_cnt=-1;
goto error_out; goto error_out;
} }
if(transaction_version>0)
{
snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version);
}
while(success_cnt<serial_rule_num) while(success_cnt<serial_rule_num)
{ {
batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch); batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
_exec_serial_rule(ctx, transaction_version, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed); _exec_serial_rule(ctx, transaction_list, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
assert(multi_cmd_cnt<max_multi_cmd_num); assert(multi_cmd_cnt<max_multi_cmd_num);
success_cnt+=batch_cnt; success_cnt+=batch_cnt;
} }
transaction_reply=_exec_serial_rule_end(ctx, transaction_version, server_time, renew_allowed, expected_reply, &multi_cmd_cnt); transaction_reply=_exec_serial_rule_end(ctx, transaction_list, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
if(1==mr_transaction_success(transaction_reply)) if(1==mr_transaction_success(transaction_reply))
{ {
assert(transaction_reply->elements==multi_cmd_cnt); assert(transaction_reply->elements==multi_cmd_cnt);
@@ -1323,14 +1320,11 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
if(transaction_version>0) if(transaction_version>0)
{ {
transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]); transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
if(transaction_finished_version!=transaction_version) MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command,
{ "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ",
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command,
"Race condition of redis transaction (MAAT_PRE_VER != MAAT_VERSION) : %lld != %lld , need manually intervention when occurrent frequently.",
transaction_version, transaction_version,
transaction_finished_version); transaction_finished_version);
} }
}
freeReplyObject(transaction_reply); freeReplyObject(transaction_reply);

View File

@@ -1271,8 +1271,6 @@ protected:
Maat_cmd_flushDB(_shared_feather); Maat_cmd_flushDB(_shared_feather);
Maat_initiate_feather(_shared_feather); Maat_initiate_feather(_shared_feather);
//For simulating race condition.
Maat_cmd_incrby(_shared_feather, "MAAT_PRE_VER", 1);
} }
static void TearDownTestCase() static void TearDownTestCase()
{ {