Maat读取redis出错时,未能清空已append的数据,可能导致TCP接收队列阻塞。

This commit is contained in:
zhengchao
2017-09-11 18:58:29 +08:00
parent 4674737fee
commit c7af487e41
2 changed files with 32 additions and 6 deletions

View File

@@ -513,9 +513,11 @@ FULL_UPDATE:
return full_idx ; return full_idx ;
} }
int get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
{ {
int i=0,ret=0,failed_cnt=0,idx=0; int i=0,ret=0,failed_cnt=0,idx=0;
int error_happened=0;
int *retry_ids=(int*)malloc(sizeof(int)*rule_num); int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
char redis_cmd[256]; char redis_cmd[256];
redisReply* reply=NULL; redisReply* reply=NULL;
@@ -547,16 +549,21 @@ int get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,vo
,"Redis GET %s:%s,%d failed",rm_key_prefix[rule_list[i].op] ,"Redis GET %s:%s,%d failed",rm_key_prefix[rule_list[i].op]
,rule_list[i].table_name ,rule_list[i].table_name
,rule_list[i].rule_id); ,rule_list[i].rule_id);
free(retry_ids); error_happened=1;
return -1;
} }
} }
freeReplyObject(reply); freeReplyObject(reply);
} }
if(error_happened==1)
{
free(retry_ids);
return -1;
}
for(i=0;i<failed_cnt;i++) for(i=0;i<failed_cnt;i++)
{ {
idx=retry_ids[i]; idx=retry_ids[i];
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[0] snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[MAAT_OP_DEL]
,rule_list[idx].table_name ,rule_list[idx].table_name
,rule_list[idx].rule_id); ,rule_list[idx].rule_id);
reply=_wrap_redisCommand(c, redis_cmd); reply=_wrap_redisCommand(c, redis_cmd);
@@ -567,6 +574,25 @@ int get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,vo
free(retry_ids); free(retry_ids);
return 0; return 0;
} }
int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
{
int max_redis_batch=4*1024,batch_cnt=0;
int success_cnt=0,ret=0;
while(success_cnt<rule_num)
{
batch_cnt=MIN(rule_num-success_cnt,max_redis_batch);
ret=_get_maat_redis_value(c,rule_list+success_cnt,batch_cnt,logger);
if(ret<0)
{
return -1;
}
else
{
success_cnt+=batch_cnt;
}
}
return 0;
}
int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt) int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt)
{ {
int serial_num=0; int serial_num=0;
@@ -1010,7 +1036,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
} }
if(rule_num>0) if(rule_num>0)
{ {
ret=get_rm_value(c,rule_list,rule_num, logger); ret=get_maat_redis_value(c,rule_list,rule_num, logger);
if(ret<0) if(ret<0)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");

View File

@@ -30,7 +30,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_0_20170910=1; int MAAT_FRAME_VERSION_2_0_20170911=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};