|
|
|
|
@@ -591,7 +591,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
|
|
|
|
|
"Inc Update form instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
|
|
|
|
|
"Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
|
|
|
|
|
*list=s_rule_array;
|
|
|
|
|
*update_type=CM_UPDATE_TYPE_INC;
|
|
|
|
|
*new_version=target_version;
|
|
|
|
|
@@ -1056,7 +1056,7 @@ int mr_operation_success(redisReply* actual_reply, struct expected_reply* expect
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version)
|
|
|
|
|
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *transaction_version)
|
|
|
|
|
{
|
|
|
|
|
int ret=-1;
|
|
|
|
|
redisReply* data_reply=NULL;
|
|
|
|
|
@@ -1071,9 +1071,9 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
|
|
|
|
|
if(rule_num>renew_rule_num)
|
|
|
|
|
{
|
|
|
|
|
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
|
|
|
|
*maat_redis_version=read_redis_integer(data_reply);
|
|
|
|
|
*transaction_version=read_redis_integer(data_reply);
|
|
|
|
|
freeReplyObject(data_reply);
|
|
|
|
|
if(*maat_redis_version<0)
|
|
|
|
|
if(*transaction_version<0)
|
|
|
|
|
{
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
@@ -1086,7 +1086,17 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
|
|
|
|
|
//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME
|
|
|
|
|
const char* lua_exec_done=
|
|
|
|
|
"local maat_version=redis.call(\'incrby\', KEYS[1], 1);"
|
|
|
|
|
"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);"
|
|
|
|
|
"for k,v in pairs(transaction) do"
|
|
|
|
|
" redis.call(\'zadd\', KEYS[2], maat_version, v);"
|
|
|
|
|
"end;"
|
|
|
|
|
"redis.call(\'del\', KEYS[4]);"
|
|
|
|
|
"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
|
|
|
|
|
"return maat_version;";
|
|
|
|
|
redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
|
|
|
|
|
{
|
|
|
|
|
redisReply* data_reply=NULL;
|
|
|
|
|
if(renew_allowed==1)
|
|
|
|
|
@@ -1095,21 +1105,23 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version
|
|
|
|
|
expect_reply[*cnt].srule_seq=-1;
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
}
|
|
|
|
|
if(maat_redis_version>0)
|
|
|
|
|
if(strlen(transaction_list)>0)
|
|
|
|
|
{
|
|
|
|
|
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",mr_version_sset,server_time,maat_redis_version);
|
|
|
|
|
data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld",
|
|
|
|
|
lua_exec_done,
|
|
|
|
|
mr_status_sset,
|
|
|
|
|
mr_version_sset,
|
|
|
|
|
transaction_list,
|
|
|
|
|
server_time);
|
|
|
|
|
freeReplyObject(data_reply);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
|
|
|
|
|
freeReplyObject(data_reply);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
}
|
|
|
|
|
data_reply=_wrap_redisCommand(ctx,"EXEC");
|
|
|
|
|
return data_reply;
|
|
|
|
|
}
|
|
|
|
|
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
|
|
|
|
|
|
|
|
|
|
void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
|
|
|
|
|
{
|
|
|
|
|
redisReply* data_reply=NULL;
|
|
|
|
|
unsigned int append_cmd_cnt=0, i=0;
|
|
|
|
|
@@ -1127,13 +1139,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
append_cmd_cnt++;
|
|
|
|
|
//Allowing add duplicated members for rule id recycling.
|
|
|
|
|
redisAppendCommand(ctx,"ZADD %s %lld ADD,%s,%d",
|
|
|
|
|
mr_status_sset,
|
|
|
|
|
version,
|
|
|
|
|
redisAppendCommand(ctx,"RPUSH %s ADD,%s,%d",
|
|
|
|
|
transaction_list,
|
|
|
|
|
s_rule[i].table_name,
|
|
|
|
|
s_rule[i].rule_id);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
append_cmd_cnt++;
|
|
|
|
|
if(s_rule[i].timeout>0)
|
|
|
|
|
@@ -1186,13 +1196,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
|
|
|
|
|
append_cmd_cnt++;
|
|
|
|
|
|
|
|
|
|
//NX: Don't update already exisiting elements. Always add new elements.
|
|
|
|
|
redisAppendCommand(ctx,"ZADD %s %d DEL,%s,%d",
|
|
|
|
|
mr_status_sset,
|
|
|
|
|
version,
|
|
|
|
|
redisAppendCommand(ctx,"RPUSH %s DEL,%s,%d",
|
|
|
|
|
transaction_list,
|
|
|
|
|
s_rule[i].table_name,
|
|
|
|
|
s_rule[i].rule_id);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
|
|
|
|
(*cnt)++;
|
|
|
|
|
append_cmd_cnt++;
|
|
|
|
|
|
|
|
|
|
@@ -1253,9 +1261,9 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|
|
|
|
unsigned int multi_cmd_cnt=0, success_cnt=0;
|
|
|
|
|
const int MAX_REDIS_OP_PER_SRULE=8;
|
|
|
|
|
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
|
|
|
|
|
|
|
|
|
|
char transaction_list[MAX_TABLE_NAME_LEN*2]={0};
|
|
|
|
|
struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
|
|
|
|
|
long long new_version=0;
|
|
|
|
|
long long transaction_version=0, transaction_finished_version=0;
|
|
|
|
|
int renew_num=0,ret=0;
|
|
|
|
|
for(i=0;i<serial_rule_num;i++)
|
|
|
|
|
{
|
|
|
|
|
@@ -1265,20 +1273,24 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version);
|
|
|
|
|
if(ret!=0)//Preconditions of transaction is not qualified.
|
|
|
|
|
ret=_exec_serial_rule_begin(ctx, serial_rule_num, renew_num, &renew_allowed, &transaction_version);
|
|
|
|
|
if(ret!=0)//Preconditions for transaction are not satisfied.
|
|
|
|
|
{
|
|
|
|
|
success_cnt=-1;
|
|
|
|
|
goto error_out;
|
|
|
|
|
}
|
|
|
|
|
if(transaction_version>0)
|
|
|
|
|
{
|
|
|
|
|
snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version);
|
|
|
|
|
}
|
|
|
|
|
while(success_cnt<serial_rule_num)
|
|
|
|
|
{
|
|
|
|
|
batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
|
|
|
|
|
_exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
|
|
|
|
|
_exec_serial_rule(ctx, transaction_list, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
|
|
|
|
|
assert(multi_cmd_cnt<max_multi_cmd_num);
|
|
|
|
|
success_cnt+=batch_cnt;
|
|
|
|
|
}
|
|
|
|
|
transaction_reply=_exec_serial_rule_end(ctx,new_version,server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
|
|
|
|
|
transaction_reply=_exec_serial_rule_end(ctx, transaction_list, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
|
|
|
|
|
if(1==mr_transaction_success(transaction_reply))
|
|
|
|
|
{
|
|
|
|
|
assert(transaction_reply->elements==multi_cmd_cnt);
|
|
|
|
|
@@ -1293,10 +1305,10 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
rule_seq=expected_reply[i].srule_seq;
|
|
|
|
|
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
|
|
|
|
|
,"%s %s %d failed, rule id maybe conflict or not exist."
|
|
|
|
|
,mr_op_str[s_rule[rule_seq].op]
|
|
|
|
|
,s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id);
|
|
|
|
|
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_command,
|
|
|
|
|
"%s %s %d failed, rule id maybe conflict or not exist.",
|
|
|
|
|
mr_op_str[s_rule[rule_seq].op],
|
|
|
|
|
s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id);
|
|
|
|
|
success_cnt--;
|
|
|
|
|
last_failed=rule_seq;
|
|
|
|
|
}
|
|
|
|
|
@@ -1304,7 +1316,16 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
success_cnt=-1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(transaction_version>0)
|
|
|
|
|
{
|
|
|
|
|
transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
|
|
|
|
|
MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command,
|
|
|
|
|
"Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ",
|
|
|
|
|
transaction_version,
|
|
|
|
|
transaction_finished_version);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freeReplyObject(transaction_reply);
|
|
|
|
|
|
|
|
|
|
error_out:
|
|
|
|
|
@@ -1837,7 +1858,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx* m
|
|
|
|
|
get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
start(new_version,update_type,u_para);
|
|
|
|
|
start(new_version,update_type,u_para);
|
|
|
|
|
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).",
|
|
|
|
|
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num);
|
|
|
|
|
for(i=0;i<rule_num;i++)
|
|
|
|
|
|