Command支持淘汰时间重置操作。
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
#include <assert.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
|
||||
#define maat_command (module_name_str("MAAT_COMMAND"))
|
||||
@@ -18,8 +19,9 @@ const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
|
||||
const char* rm_label_sset="MAAT_LABEL_INDEX";
|
||||
const char* rm_version_sset="MAAT_VERSION_TIMER";
|
||||
const char* rm_expire_lock="EXPIRE_OP_LOCK";
|
||||
const long rm_expire_lock_timeout=300*1000;
|
||||
const static int MAAT_REDIS_SYNC_TIME=30*60;
|
||||
|
||||
const char* rm_op_str[]={"DEL","ADD","RENEW_TIMEOUT"};
|
||||
|
||||
|
||||
struct _Maat_cmd_inner_t
|
||||
@@ -532,11 +534,11 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
|
||||
do{
|
||||
target_version++;
|
||||
rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger);
|
||||
if(ret>0)
|
||||
if(rule_num>0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if(ret<0)
|
||||
else if(rule_num<0)
|
||||
{
|
||||
goto FULL_UPDATE;
|
||||
}
|
||||
@@ -862,7 +864,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
|
||||
}
|
||||
else
|
||||
{
|
||||
set_serial_rule(list+rule_num,MAAT_OP_DEL,cmd->compile.config_id,cmd->label_id,feather->compile_tn,NULL,timeout);
|
||||
set_serial_rule(list+rule_num,op,cmd->compile.config_id,cmd->label_id,feather->compile_tn,NULL,timeout);
|
||||
}
|
||||
rule_num++;
|
||||
for(i=0;i<cmd->group_num;i++)
|
||||
@@ -881,7 +883,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
|
||||
}
|
||||
else
|
||||
{
|
||||
set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL,0);
|
||||
set_serial_rule(list+rule_num,op,p_group->group_id,0,feather->group_tn,NULL,timeout);
|
||||
}
|
||||
rule_num++;
|
||||
if(p_group->regions==NULL)//group reuse.
|
||||
@@ -904,8 +906,8 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
|
||||
}
|
||||
else
|
||||
{
|
||||
set_serial_rule(list+rule_num,MAAT_OP_DEL
|
||||
,p_region->region_id,0,p_region->table_name,NULL,0);
|
||||
set_serial_rule(list+rule_num,op
|
||||
,p_region->region_id,0,p_region->table_name,NULL,timeout);
|
||||
|
||||
}
|
||||
rule_num++;
|
||||
@@ -925,118 +927,215 @@ int mr_transaction_success(redisReply* data_reply)
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
int mr_operation_success(redisReply* data_reply)
|
||||
int mr_operation_success(redisReply* actual_reply, redisReply* expected_reply)
|
||||
{
|
||||
if(data_reply->type==REDIS_REPLY_INTEGER&&data_reply->integer==0)
|
||||
if(expected_reply->type!=actual_reply->type)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
if(expected_reply->type==REDIS_REPLY_INTEGER&&expected_reply->integer!=actual_reply->integer)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
long long _exec_serial_rule_begin(redisContext* ctx)
|
||||
{
|
||||
long long maat_redis_version=0;
|
||||
redisReply* data_reply=NULL;
|
||||
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
||||
maat_redis_version=read_redis_integer(data_reply);
|
||||
freeReplyObject(data_reply);
|
||||
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
||||
return maat_redis_version;
|
||||
}
|
||||
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time)
|
||||
int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
|
||||
{
|
||||
redisReply* reply=NULL;
|
||||
int ret=0;
|
||||
reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire);
|
||||
if(reply->type==REDIS_REPLY_NIL)
|
||||
{
|
||||
ret=0;
|
||||
}
|
||||
else
|
||||
{
|
||||
ret=1;
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
return ret;
|
||||
}
|
||||
void redlock_unlock(redisContext * ctx, const char * lock_name)
|
||||
{
|
||||
redisReply* reply=NULL;
|
||||
reply=_wrap_redisCommand(ctx,"DEL %s", lock_name);
|
||||
freeReplyObject(reply);
|
||||
|
||||
}
|
||||
struct expected_reply_t
|
||||
{
|
||||
int srule_seq;
|
||||
redisReply reply;
|
||||
};
|
||||
|
||||
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version)
|
||||
{
|
||||
int ret=0;
|
||||
redisReply* data_reply=NULL;
|
||||
if(renew_rule_num>0)
|
||||
{
|
||||
while(0==redlock_try_lock(ctx, rm_expire_lock, rm_expire_lock_timeout))
|
||||
{
|
||||
usleep(1000);
|
||||
}
|
||||
*renew_allowed=1;
|
||||
}
|
||||
if(rule_num>renew_rule_num)
|
||||
{
|
||||
data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1");
|
||||
*maat_redis_version=read_redis_integer(data_reply);
|
||||
freeReplyObject(data_reply);
|
||||
}
|
||||
if(*renew_allowed==1||rule_num>renew_rule_num)
|
||||
{
|
||||
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
||||
freeReplyObject(data_reply);
|
||||
ret=1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed)
|
||||
{
|
||||
redisReply* data_reply=NULL;
|
||||
|
||||
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
|
||||
freeReplyObject(data_reply);
|
||||
data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
|
||||
freeReplyObject(data_reply);
|
||||
if(renew_allowed==1)
|
||||
{
|
||||
redlock_unlock(ctx, rm_expire_lock);
|
||||
}
|
||||
if(maat_redis_version>0)
|
||||
{
|
||||
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
|
||||
freeReplyObject(data_reply);
|
||||
data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
|
||||
freeReplyObject(data_reply);
|
||||
}
|
||||
data_reply=_wrap_redisCommand(ctx,"EXEC");
|
||||
return data_reply;
|
||||
}
|
||||
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, int* multi_cmd_seq, unsigned int *cnt, int offset)
|
||||
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, struct expected_reply_t* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
|
||||
{
|
||||
int i=0;
|
||||
redisReply* data_reply=NULL;
|
||||
int append_cmd_cnt=0;
|
||||
for(i=0;i<rule_num;i++)
|
||||
{
|
||||
if(s_rule[i].op==MAAT_OP_ADD)
|
||||
switch(s_rule[i].op)
|
||||
{
|
||||
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,s_rule[i].table_line);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
//NX: Don't update already exisiting elements. Always add new elements.
|
||||
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
|
||||
,version
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
if(s_rule[i].timeout>0)
|
||||
{
|
||||
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
|
||||
case MAAT_OP_ADD:
|
||||
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,s_rule[i].table_line);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
//NX: Don't update already exisiting elements. Always add new elements.
|
||||
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
|
||||
,version
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
if(s_rule[i].timeout>0)
|
||||
{
|
||||
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
|
||||
,s_rule[i].timeout
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
}
|
||||
if(s_rule[i].label_id>0)
|
||||
{
|
||||
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
|
||||
,s_rule[i].label_id
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=1;
|
||||
(*cnt)++;
|
||||
|
||||
append_cmd_cnt++;
|
||||
}
|
||||
break;
|
||||
case MAAT_OP_DEL:
|
||||
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d"
|
||||
,rm_key_prefix[MAAT_OP_ADD]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,rm_key_prefix[MAAT_OP_DEL]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
|
||||
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,MAAT_REDIS_SYNC_TIME);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
|
||||
//NX: Don't update already exisiting elements. Always add new elements.
|
||||
redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset
|
||||
,version
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
|
||||
// Try to remove from expiration sorted set, no matter wheather it exists or not.
|
||||
redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=-1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
|
||||
redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset
|
||||
,s_rule[i].rule_id);
|
||||
expect_reply[*cnt].srule_seq=-1;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
break;
|
||||
case MAAT_OP_RENEW_TIMEOUT:
|
||||
if(renew_allowed!=1)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
//s_rule[i].timeout>0 was checked by caller.
|
||||
//XX: Only update elements that already exist. Never add elements.
|
||||
redisAppendCommand(ctx,"ZADD %s XX %lld %s,%d",rm_expire_sset
|
||||
,s_rule[i].timeout
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
expect_reply[*cnt].srule_seq=i+offset;
|
||||
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
|
||||
expect_reply[*cnt].reply.integer=0;
|
||||
(*cnt)++;
|
||||
append_cmd_cnt++;
|
||||
}
|
||||
if(s_rule[i].label_id>0)
|
||||
{
|
||||
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
|
||||
,s_rule[i].label_id
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d"
|
||||
,rm_key_prefix[MAAT_OP_ADD]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,rm_key_prefix[MAAT_OP_DEL]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
|
||||
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL]
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id
|
||||
,MAAT_REDIS_SYNC_TIME);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
|
||||
//NX: Don't update already exisiting elements. Always add new elements.
|
||||
redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset
|
||||
,version
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=i+offset;
|
||||
append_cmd_cnt++;
|
||||
|
||||
// Try to remove from expiration sorted set, no matter wheather it exists or not.
|
||||
redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset
|
||||
,s_rule[i].table_name
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=-1;
|
||||
append_cmd_cnt++;
|
||||
|
||||
redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset
|
||||
,s_rule[i].rule_id);
|
||||
multi_cmd_seq[(*cnt)++]=-1;
|
||||
append_cmd_cnt++;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
for(i=0;i<append_cmd_cnt;i++)
|
||||
{
|
||||
@@ -1045,52 +1144,88 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
#define MAX_REDIS_OP_PER_SRULE 8
|
||||
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
|
||||
{
|
||||
int max_redis_batch=1*1024,batch_cnt=0;
|
||||
int success_cnt=0,j=0;
|
||||
int success_cnt=0,j=0,renew_allowed=0,last_failed=-1;
|
||||
redisReply*transaction_reply=NULL,*p=NULL;
|
||||
unsigned int i=0;
|
||||
|
||||
unsigned int multi_cmd_cnt=0;
|
||||
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
|
||||
int *multi_cmd_seq=(int*)calloc(sizeof(int), max_multi_cmd_num);
|
||||
|
||||
|
||||
struct expected_reply_t *expected_reply=(struct expected_reply_t*)calloc(sizeof(struct expected_reply_t), max_multi_cmd_num);
|
||||
long long new_version=0;
|
||||
new_version=_exec_serial_rule_begin(ctx);
|
||||
int renew_num=0,ret=0;
|
||||
for(i=0;i<(unsigned int)serial_rule_num;i++)
|
||||
{
|
||||
if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
|
||||
{
|
||||
renew_num++;
|
||||
}
|
||||
}
|
||||
|
||||
ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version);
|
||||
if(ret!=1)//Preconditions of transaction is not qualified.
|
||||
{
|
||||
goto error_out;
|
||||
}
|
||||
while(success_cnt<serial_rule_num)
|
||||
{
|
||||
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
|
||||
_exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,multi_cmd_seq, &multi_cmd_cnt,success_cnt);
|
||||
_exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
|
||||
assert(multi_cmd_cnt<max_multi_cmd_num);
|
||||
success_cnt+=batch_cnt;
|
||||
}
|
||||
transaction_reply=_exec_serial_rule_end(ctx,new_version,server_time);
|
||||
transaction_reply=_exec_serial_rule_end(ctx,new_version,server_time, renew_allowed);
|
||||
if(1==mr_transaction_success(transaction_reply))
|
||||
{
|
||||
assert(transaction_reply->elements==multi_cmd_cnt+2);
|
||||
for(i=0;i<transaction_reply->elements;i++)
|
||||
assert(transaction_reply->elements-2<=multi_cmd_cnt);
|
||||
for(i=0;i<multi_cmd_cnt;i++)
|
||||
{
|
||||
p=transaction_reply->element[i];
|
||||
j=multi_cmd_seq[i];
|
||||
if(j!=-1&&0==mr_operation_success(p))
|
||||
{
|
||||
assert(j<serial_rule_num);
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
|
||||
,"%s %s %d failed, rule id maybe conflicts.",(s_rule[j].op==MAAT_OP_ADD)?"ADD":"DEL"
|
||||
, s_rule[j].table_name,s_rule[j].rule_id);
|
||||
success_cnt--;
|
||||
//failed is acceptable
|
||||
//or transaciton is success
|
||||
//or continuation of last failed
|
||||
if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p,&(expected_reply[i].reply))||last_failed==expected_reply[i].srule_seq)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
assert(j<serial_rule_num);
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
|
||||
,"%s %s %d failed, rule id maybe conflict or not exist."
|
||||
,rm_op_str[s_rule[j].op]
|
||||
,s_rule[j].table_name,s_rule[j].rule_id);
|
||||
success_cnt--;
|
||||
last_failed=expected_reply[i].srule_seq;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
success_cnt=-1;
|
||||
}
|
||||
}
|
||||
freeReplyObject(transaction_reply);
|
||||
free(multi_cmd_seq);
|
||||
|
||||
error_out:
|
||||
if(renew_num>0&&renew_allowed!=1)
|
||||
{
|
||||
for(i=0;i<(unsigned int)serial_rule_num;i++)
|
||||
{
|
||||
if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
|
||||
{
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
|
||||
,"%s %s %d is not allowed due to lock contention.",rm_op_str[MAAT_OP_RENEW_TIMEOUT]
|
||||
, s_rule[i].table_name,s_rule[i].rule_id);
|
||||
}
|
||||
}
|
||||
if(success_cnt>0)
|
||||
{
|
||||
success_cnt-=renew_num;
|
||||
}
|
||||
}
|
||||
free(expected_reply);
|
||||
return success_cnt;
|
||||
}
|
||||
|
||||
@@ -1236,29 +1371,7 @@ void cleanup_update_status(redisContext *ctx, void *logger)
|
||||
,entry_num);
|
||||
|
||||
}
|
||||
int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
|
||||
{
|
||||
redisReply* reply=NULL;
|
||||
int ret=0;
|
||||
reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire);
|
||||
if(reply->type==REDIS_REPLY_NIL)
|
||||
{
|
||||
ret=0;
|
||||
}
|
||||
else
|
||||
{
|
||||
ret=1;
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
return ret;
|
||||
}
|
||||
void redlock_unlock(redisContext * ctx, const char * lock_name)
|
||||
{
|
||||
redisReply* reply=NULL;
|
||||
reply=_wrap_redisCommand(ctx,"DEL %s", lock_name);
|
||||
freeReplyObject(reply);
|
||||
|
||||
}
|
||||
void redis_monitor_traverse(long long version,redisContext *c,
|
||||
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
|
||||
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
|
||||
@@ -1277,7 +1390,7 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
||||
if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write
|
||||
{
|
||||
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
|
||||
if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, 300*1000))
|
||||
if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, rm_expire_lock_timeout))
|
||||
{
|
||||
check_maat_expiration(feather->redis_read_ctx, logger);
|
||||
cleanup_update_status(feather->redis_read_ctx, logger);
|
||||
@@ -1321,7 +1434,7 @@ void redis_monitor_traverse(long long version,redisContext *c,
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
|
||||
}
|
||||
}
|
||||
start(new_version,update_type,u_para);
|
||||
start(new_version,update_type,u_para);
|
||||
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld->%lld (%d entries).",
|
||||
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num);
|
||||
for(i=0;i<rule_num;i++)
|
||||
@@ -1490,7 +1603,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
|
||||
int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_rule, int line_num ,enum MAAT_OPERATION op)
|
||||
{
|
||||
int i=0;
|
||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||
int ret=0, table_id=0,success_cnt=0;
|
||||
struct serial_rule_t *s_rule=NULL;
|
||||
long long server_time=0,absolute_expire_time=0;
|
||||
@@ -1539,34 +1652,21 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
|
||||
ret=-1;
|
||||
goto error_out;
|
||||
}
|
||||
}
|
||||
if(op==MAAT_OP_RENEW_TIMEOUT)
|
||||
{
|
||||
assert(line_rule[i]->expire_after>0);
|
||||
}
|
||||
if(line_rule[i]->expire_after>0)
|
||||
{
|
||||
absolute_expire_time=server_time+line_rule[i]->expire_after;
|
||||
}
|
||||
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time);
|
||||
}
|
||||
ret=0;
|
||||
while(success_cnt<line_num&&retry<10)
|
||||
{
|
||||
success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger);
|
||||
if(success_cnt<0)//transaction failed
|
||||
{
|
||||
retry++;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger);
|
||||
assert(success_cnt==line_num);
|
||||
ret=success_cnt;
|
||||
_feather->line_cmd_acc_num+=success_cnt;
|
||||
if(retry>5)
|
||||
{
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command
|
||||
,"Command set line id %d success after retry %d times."
|
||||
, line_rule[0]->rule_id, retry
|
||||
);
|
||||
_feather->line_cmd_acc_num+=success_cnt;
|
||||
|
||||
error_out:
|
||||
for(i=0;i<line_num;i++)
|
||||
@@ -1654,15 +1754,21 @@ int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPER
|
||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd;
|
||||
int ret=0;
|
||||
_cmd->op=op;
|
||||
_cmd->op=op;
|
||||
assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD||op==MAAT_OP_RENEW_TIMEOUT);
|
||||
assert(_cmd->next==NULL);
|
||||
assert(_cmd->next==NULL);
|
||||
if(op==MAAT_OP_RENEW_TIMEOUT)
|
||||
{
|
||||
{
|
||||
assert(cmd->expire_after>0);
|
||||
}
|
||||
}
|
||||
switch(op)
|
||||
{
|
||||
{
|
||||
case MAAT_OP_DEL:
|
||||
case MAAT_OP_RENEW_TIMEOUT:
|
||||
ret=reconstruct_cmd(_feather, _cmd);
|
||||
break;
|
||||
case MAAT_OP_ADD:
|
||||
ret=fix_table_name(_feather, cmd);
|
||||
break;
|
||||
}
|
||||
if(ret<0)
|
||||
@@ -1688,7 +1794,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
|
||||
int Maat_cmd_commit(Maat_feather_t feather)
|
||||
{
|
||||
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||
|
||||
|
||||
int ret=0,i=0;
|
||||
int new_region_num=0,new_group_num=0;
|
||||
int serial_rule_num=0,serial_rule_idx=0;
|
||||
@@ -1711,6 +1817,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
|
||||
{
|
||||
ret=connect_redis_for_write(_feather);
|
||||
if(ret!=0)
|
||||
{
|
||||
ret=-1;
|
||||
goto error_out;
|
||||
}
|
||||
@@ -1743,23 +1850,9 @@ int Maat_cmd_commit(Maat_feather_t feather)
|
||||
p=p->next;
|
||||
}
|
||||
assert(serial_rule_idx==serial_rule_num);
|
||||
transection_success=0;
|
||||
while(transection_success<serial_rule_num&&retry<10)
|
||||
{
|
||||
transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
|
||||
if(transection_success==-1)
|
||||
{
|
||||
retry++;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(retry>5)
|
||||
{
|
||||
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command
|
||||
,"MAAT retry for %d times.", retry);
|
||||
transection_success=0;
|
||||
transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
|
||||
assert(transection_success==serial_rule_num);
|
||||
ret=_feather->cmd_q_cnt;
|
||||
_feather->cmd_acc_num+=_feather->cmd_q_cnt;
|
||||
error_out:
|
||||
|
||||
Reference in New Issue
Block a user