maat_command当出现配置ID冲突时告警。

This commit is contained in:
zhengchao
2017-10-11 19:15:36 +08:00
parent b5937a97b9
commit 9f54de4480

View File

@@ -780,29 +780,19 @@ int mr_transaction_success(redisReply* data_reply)
return 1;
}
}
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time)
int mr_operation_success(redisReply* data_reply)
{
int max_redis_batch=1*1024,batch_cnt=0;
int success_cnt=0,ret=0;
while(success_cnt<serial_rule_num)
if(data_reply->type==REDIS_REPLY_INTEGER&&data_reply->integer==0)
{
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time);
if(ret==1)
{
success_cnt+=batch_cnt;
}
else
{
break;
}
return 0;
}
return success_cnt;
return 1;
}
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time)
#define REDIS_OP_PER_SRULE 8
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
{
int append_cmd_cnt=0,i=0;
int i=0,j=0,ret=0;
long long maat_redis_version=0;
redisReply* data_reply=NULL;
int redis_transaction_success=1;
@@ -814,7 +804,9 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(data_reply);
append_cmd_cnt=0;
int max_append_cnt=serial_rule_num*REDIS_OP_PER_SRULE+4;
int append_cmd_cnt=0;
int *pipeline_seq=(int*)calloc(sizeof(int),max_append_cnt);
assert(server_time>0);
for(i=0;i<serial_rule_num;i++)
{
@@ -824,12 +816,14 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
,s_rule[i].table_name
,s_rule[i].rule_id
,s_rule[i].table_line);
pipeline_seq[append_cmd_cnt]=i;
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,maat_redis_version
,s_rule[i].table_name
,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i;
append_cmd_cnt++;
if(s_rule[i].timeout>0)
{
@@ -837,6 +831,7 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
,s_rule[i].timeout
,s_rule[i].table_name
,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i;
append_cmd_cnt++;
}
if(s_rule[i].label_id>0)
@@ -844,12 +839,19 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
,s_rule[i].label_id
,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i;
append_cmd_cnt++;
}
}
else
{
append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version);
ret=del_rule_from_redis(ctx,s_rule+i,maat_redis_version);
for(j=0;j<ret;j++)
{
pipeline_seq[append_cmd_cnt+j]=i;
}
append_cmd_cnt+=ret;
}
}
@@ -867,10 +869,37 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
{
redis_transaction_success=0;
}
if(0==mr_operation_success(data_reply))
{
j=pipeline_seq[i];
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"exec rule %s, %d failed, content %s, rule id maybe conflicts.", s_rule[j].rule_id, s_rule[j].table_name, s_rule[j].table_line);
}
freeReplyObject(data_reply);
}
free(pipeline_seq);
return redis_transaction_success;
}
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
{
int max_redis_batch=1*1024,batch_cnt=0;
int success_cnt=0,ret=0;
while(success_cnt<serial_rule_num)
{
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time,logger);
if(ret==1)
{
success_cnt+=batch_cnt;
}
else
{
break;
}
}
return success_cnt;
}
int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
{
int i=0,j=0,ret=0;
@@ -949,7 +978,7 @@ void check_maat_expiration(redisContext *ctx, void *logger)
assert(ret==2);
}
freeReplyObject(data_reply);
success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time);
success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time, logger);
if(success_cnt==(int)s_rule_num)
{
@@ -1192,7 +1221,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
ret=map_str2int(_feather->map_tablename2id, line_rule[i]->table_name, &table_id);
if(ret<0)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
,"Command set line id %d failed: unknown table %s."
, line_rule[i]->rule_id
, line_rule[i]->table_name);
@@ -1201,7 +1230,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
}
if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
,"Command set line id %d failed: table %s is not a plugin table."
, line_rule[i]->rule_id
, line_rule[i]->table_name);
@@ -1214,7 +1243,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
if(ret<0||
(op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1'))
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
,"Command set line id %d failed: illegal valid flag."
, line_rule[i]->rule_id);
ret=-1;
@@ -1229,12 +1258,12 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
ret=0;
while(success_cnt<line_num)
{
success_cnt+=exec_serial_rule(_feather->redis_write_ctx,s_rule+success_cnt, line_num-success_cnt,server_time);
success_cnt+=exec_serial_rule(_feather->redis_write_ctx,s_rule+success_cnt, line_num-success_cnt,server_time,_feather->logger);
retry++;
}
if(retry>10)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command
,"Command set line id %d success after retry %d times."
, line_rule[0]->rule_id
);
@@ -1411,7 +1440,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
transection_success=0;
while(transection_success<serial_rule_num)
{
transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time);
transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time,_feather->logger);
if(transection_success<serial_rule_num)
{
retry++;