diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index f22edfc..469637d 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -780,29 +780,19 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } -int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time); -int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time) +int mr_operation_success(redisReply* data_reply) { - int max_redis_batch=1*1024,batch_cnt=0; - int success_cnt=0,ret=0; - while(success_cnttype==REDIS_REPLY_INTEGER&&data_reply->integer==0) { - batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch); - ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time); - if(ret==1) - { - success_cnt+=batch_cnt; - } - else - { - break; - } + return 0; } - return success_cnt; + return 1; } -int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time) + +#define REDIS_OP_PER_SRULE 8 +int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger) { - int append_cmd_cnt=0,i=0; + int i=0,j=0,ret=0; long long maat_redis_version=0; redisReply* data_reply=NULL; int redis_transaction_success=1; @@ -814,7 +804,9 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx,"MULTI"); freeReplyObject(data_reply); - append_cmd_cnt=0; + int max_append_cnt=serial_rule_num*REDIS_OP_PER_SRULE+4; + int append_cmd_cnt=0; + int *pipeline_seq=(int*)calloc(sizeof(int),max_append_cnt); assert(server_time>0); for(i=0;i0) { @@ -837,6 +831,7 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ ,s_rule[i].timeout ,s_rule[i].table_name ,s_rule[i].rule_id); + pipeline_seq[append_cmd_cnt]=i; append_cmd_cnt++; } if(s_rule[i].label_id>0) @@ -844,12 +839,19 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_ redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset ,s_rule[i].label_id ,s_rule[i].rule_id); + pipeline_seq[append_cmd_cnt]=i; append_cmd_cnt++; } } else { - append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); + ret=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); + for(j=0;jmap_tablename2id, line_rule[i]->table_name, &table_id); if(ret<0) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command ,"Command set line id %d failed: unknown table %s." , line_rule[i]->rule_id , line_rule[i]->table_name); @@ -1201,7 +1230,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru } if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command ,"Command set line id %d failed: table %s is not a plugin table." , line_rule[i]->rule_id , line_rule[i]->table_name); @@ -1214,7 +1243,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru if(ret<0|| (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1')) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command ,"Command set line id %d failed: illegal valid flag." , line_rule[i]->rule_id); ret=-1; @@ -1229,12 +1258,12 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru ret=0; while(success_cntredis_write_ctx,s_rule+success_cnt, line_num-success_cnt,server_time); + success_cnt+=exec_serial_rule(_feather->redis_write_ctx,s_rule+success_cnt, line_num-success_cnt,server_time,_feather->logger); retry++; } if(retry>10) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command ,"Command set line id %d success after retry %d times." , line_rule[0]->rule_id ); @@ -1411,7 +1440,7 @@ int Maat_cmd_commit(Maat_feather_t feather) transection_success=0; while(transection_successserver_time); + transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time,_feather->logger); if(transection_success