1、hiredis客户端append超过2400万后出现段错误,maat每次最多append 8K条数据。

2、修复redis中配置为空时,不能正确清空配置的bug。
This commit is contained in:
zhengchao
2017-08-24 17:39:40 +08:00
parent f5b1b08ee9
commit d223895e3d
3 changed files with 50 additions and 25 deletions

View File

@@ -16,7 +16,7 @@ const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* rm_status_sset="MAAT_UPDATE_STATUS";
const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
const char* rm_label_sset="MAAT_LABEL_INDEX";
const char* rm_version_sset="/AAT_VERSION_TIMER";
const char* rm_version_sset="MAAT_VERSION_TIMER";
const static int MAAT_REDIS_SYNC_TIME=30*60;
struct serial_rule_t //rm= Redis Maat
@@ -370,7 +370,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy.");
return 0;
return -1;
}
}
else
@@ -380,7 +380,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff);
_wrap_redisReconnect(c,logger);
return 0;
return -1;
}
version_in_redis=read_redis_integer(reply);
assert(version_in_redis>=version);
@@ -405,14 +405,14 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET %s failed %s.",rm_status_sset,err_buff);
return 0;
return -1;
}
assert(reply->type==REDIS_REPLY_ARRAY);
if(reply->elements==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis);
freeReplyObject(reply);
return 0;
return -1;
}
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
if(tmp_reply->type!=REDIS_REPLY_STRING)
@@ -732,7 +732,27 @@ int mr_transaction_success(redisReply* data_reply)
return 1;
}
}
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time)
{
int max_redis_batch=8*1024,batch_cnt=0;
int success_cnt=0,ret=0;
while(success_cnt<serial_rule_num)
{
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time);
if(ret==1)
{
success_cnt+=batch_cnt;
}
else
{
break;
}
}
return success_cnt;
}
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time)
{
int append_cmd_cnt=0,i=0;
long long maat_redis_version=0;
@@ -859,7 +879,7 @@ void check_maat_expiration(redisContext *ctx, void *logger)
{
unsigned int i=0,s_rule_num=0;
int ret=0;
int is_success=0;
int success_cnt=0;
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
long long server_time=0;
@@ -881,9 +901,9 @@ void check_maat_expiration(redisContext *ctx, void *logger)
assert(ret==2);
}
freeReplyObject(data_reply);
is_success=exec_serial_rule(ctx,s_rule, s_rule_num,server_time);
success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time);
if(is_success==1)
if(success_cnt==(int)s_rule_num)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Succesfully expired %d rules in Redis.", s_rule_num);
@@ -957,7 +977,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
int table_id=0;
int ret=0;
struct serial_rule_t* rule_list=NULL;
int update_type=0;
int update_type=CM_UPDATE_TYPE_INC;
unsigned int new_version=0;
enum MAAT_TABLE_TYPE table_type;
void* logger=feather->logger;
@@ -968,27 +988,30 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
cleanup_update_status(feather->redis_read_ctx, logger);
}
rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type);
if(rule_num==0)
if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed
{
return;
}
ret=get_rm_value(c,rule_list,rule_num, logger);
if(ret<0)
if(rule_num>0)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
goto clean_up;
ret=get_rm_value(c,rule_list,rule_num, logger);
if(ret<0)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
goto clean_up;
}
}
start(new_version,update_type,u_para);
for(i=0;i<rule_num;i++)
{
ret=map_str2int(feather->map_tablename2id,rule_list[i].table_name,&table_id);
if(ret<0)//Unrecognized table.
{
continue;
}
table_type=feather->p_table_info[table_id]->table_type;
if(rule_list[i].op==MAAT_OP_DEL)
{
ret=map_str2int(feather->map_tablename2id,rule_list[i].table_name,&table_id);
if(ret<0)//Unrecognized table.
{
continue;
}
table_type=feather->p_table_info[table_id]->table_type;
invalidate_line(rule_list[i].table_line,table_type,feather->p_table_info[table_id]->valid_flag_column);
}
update(rule_list[i].table_name,rule_list[i].table_line,u_para);
@@ -1134,7 +1157,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
}
set_serial_rule(&s_rule, op,line_rule->rule_id,line_rule->label_id,line_rule->table_name,line_rule->table_line, absolute_expire_time);
ret=0;
while(!ret)
while(ret==0)
{
ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,server_time);
retry++;
@@ -1303,10 +1326,10 @@ int Maat_cmd_commit(Maat_feather_t feather)
}
assert(serial_rule_idx==serial_rule_num);
transection_success=0;
while(!transection_success)
while(transection_success<serial_rule_num)
{
transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time);
if(transection_success!=1)
transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time);
if(transection_success<serial_rule_num)
{
retry++;
assert(retry<5);