1、由于取全量配置时不是原子的,keys命令拿到的配置,在get时已经置为OBSOLETE,进行了修复。

2、修复删除label时,忘记进行append计数;
This commit is contained in:
zhengchao
2017-08-07 16:21:23 +08:00
parent db5e4ef609
commit 84a75c4494
2 changed files with 98 additions and 35 deletions

View File

@@ -212,6 +212,7 @@ int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long lo
{ {
redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset
,s_rule->rule_id); ,s_rule->rule_id);
append_cmd_cnt++;
} }
return append_cmd_cnt; return append_cmd_cnt;
@@ -315,12 +316,12 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_
} }
int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** list,void* logger, unsigned int* new_version,int *update_type) int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** list,void* logger, unsigned int* new_version,int *update_type)
{ {
redisReply* reply=NULL,*tmp_reply=NULL; redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL;
char err_buff[256]; char err_buff[256];
char op_str[4]; char op_str[4];
long long version_in_redis=0,nearest_rule_version=0; long long version_in_redis=0,nearest_rule_version=0;
int ret=0,retry=0; int ret=0,retry=0;
unsigned int i=0; unsigned int i=0,full_idx =0,append_cmd_cnt=0;
struct serial_rule_t *s_rule=NULL; struct serial_rule_t *s_rule=NULL;
@@ -381,7 +382,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(nearest_rule_version!=version+1) if(nearest_rule_version!=version+1)
{ {
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Noncontinuous VERSION Redis: %lld MAAT: %d.",tmp_reply->integer,version); "Noncontinuous VERSION Redis: %lld MAAT: %d.",nearest_rule_version,version);
goto FULL_UPDATE; goto FULL_UPDATE;
} }
@@ -416,24 +417,82 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
FULL_UPDATE: FULL_UPDATE:
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Initiate full udpate from version %d to %lld.",version,version_in_redis); "Initiate full udpate from version %d to %lld.",version,version_in_redis);
reply=(redisReply*)redisCommand(c, "KEYS EFFECTIVE_RULE:*"); append_cmd_cnt=0;
assert(reply->type==REDIS_REPLY_ARRAY); ret=redisAppendCommand(c, "MULTI");
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); append_cmd_cnt++;
for(i=0;i<reply->elements;i++) ret=redisAppendCommand(c, "GET MAAT_VERSION");
append_cmd_cnt++;
ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*");
append_cmd_cnt++;
//consume reply "OK" and "QUEUED".
for(i=0;i<append_cmd_cnt;i++)
{ {
assert(reply->element[i]->type==REDIS_REPLY_STRING); _wrap_redisGetReply(c, &reply);
ret=sscanf(reply->element[i]->str,"%*[^:]:%[^,],%d",s_rule[i].table_name,&(s_rule[i].rule_id)); freeReplyObject(reply);
s_rule[i].op=MAAT_OP_ADD; reply=NULL;
}
reply=_wrap_redisCommand(c,"EXEC");
assert(reply->type==REDIS_REPLY_ARRAY);
*new_version=read_redis_integer(reply->element[0]);
sub_reply=reply->element[1];
assert(sub_reply->type==REDIS_REPLY_ARRAY);
s_rule=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t));
for(full_idx=0;full_idx<sub_reply->elements;full_idx++)
{
assert(sub_reply->element[full_idx]->type==REDIS_REPLY_STRING);
ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule[full_idx].table_name,&(s_rule[full_idx].rule_id));
s_rule[full_idx].op=MAAT_OP_ADD;
assert(ret==2); assert(ret==2);
} }
freeReplyObject(reply);
*list=s_rule; *list=s_rule;
*update_type=CM_UPDATE_TYPE_FULL; *update_type=CM_UPDATE_TYPE_FULL;
freeReplyObject(reply); return full_idx ;
reply=NULL; }
return i; void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
{
int i=0,ret=0,failed_cnt=0,idx=0;
int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
char redis_cmd[256];
redisReply* reply=NULL;
for(i=0;i<rule_num;i++)
{
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[rule_list[i].op]
,rule_list[i].table_name
,rule_list[i].rule_id);
ret=redisAppendCommand(c, redis_cmd);
assert(ret==REDIS_OK);
}
for(i=0;i<rule_num;i++)
{
ret=_wrap_redisGetReply(c,&reply);
if(reply->type==REDIS_REPLY_STRING)
{
rule_list[i].table_line=_maat_strdup(reply->str);
}
else
{
assert(reply->type==REDIS_REPLY_NIL);
retry_ids[failed_cnt]=i;
failed_cnt++;
}
freeReplyObject(reply);
}
for(i=0;i<failed_cnt;i++)
{
idx=retry_ids[i];
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[0]
,rule_list[idx].table_name
,rule_list[idx].rule_id);
reply=_wrap_redisCommand(c, redis_cmd);
assert(reply->type==REDIS_REPLY_STRING);
rule_list[idx].table_line=_maat_strdup(reply->str);
freeReplyObject(reply);
}
free(retry_ids);
return;
} }
int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt) int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt)
{ {
int serial_num=0; int serial_num=0;
@@ -471,10 +530,16 @@ int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _
void* logger=feather->logger; void* logger=feather->logger;
int config_id=cmd->compile.config_id; int config_id=cmd->compile.config_id;
if(feather->scanner==NULL)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command
,"MAAT not ready.");
return -1;
}
compile_inner=(struct _Maat_compile_inner_t *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id); compile_inner=(struct _Maat_compile_inner_t *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id);
if(compile_inner==NULL) if(compile_inner==NULL)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_command MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command
,"config %d not exist." ,"config %d not exist."
,config_id); ,config_id);
return -1; return -1;
@@ -779,11 +844,9 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
const unsigned char* dec_key, const unsigned char* dec_key,
_Maat_feather_t* feather) _Maat_feather_t* feather)
{ {
redisReply* data_reply=NULL;
unsigned int rule_num=0,i=0; unsigned int rule_num=0,i=0;
int table_id=0; int table_id=0;
int ret=0; int ret=0;
char redis_cmd[256];
struct serial_rule_t* rule_list=NULL; struct serial_rule_t* rule_list=NULL;
int update_type=0; int update_type=0;
unsigned int new_version=0; unsigned int new_version=0;
@@ -799,19 +862,11 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
{ {
return; return;
} }
for(i=0;i<rule_num;i++) get_rm_value(c,rule_list,rule_num, logger);
{
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[rule_list[i].op]
,rule_list[i].table_name
,rule_list[i].rule_id);
ret=redisAppendCommand(c, redis_cmd);
assert(ret==REDIS_OK);
}
start(new_version,update_type,u_para); start(new_version,update_type,u_para);
for(i=0;i<rule_num;i++) for(i=0;i<rule_num;i++)
{ {
ret=_wrap_redisGetReply(c,&data_reply);
assert(ret==REDIS_OK);
if(rule_list[i].op==MAAT_OP_DEL) if(rule_list[i].op==MAAT_OP_DEL)
{ {
ret=map_str2int(feather->map_tablename2id,rule_list[i].table_name,&table_id); ret=map_str2int(feather->map_tablename2id,rule_list[i].table_name,&table_id);
@@ -820,13 +875,15 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
continue; continue;
} }
table_type=feather->p_table_info[table_id]->table_type; table_type=feather->p_table_info[table_id]->table_type;
invalidate_line(data_reply->str,table_type,feather->p_table_info[table_id]->valid_flag_column); invalidate_line(rule_list[i].table_line,table_type,feather->p_table_info[table_id]->valid_flag_column);
} }
update(rule_list[i].table_name,data_reply->str,u_para); update(rule_list[i].table_name,rule_list[i].table_line,u_para);
freeReplyObject(data_reply);
} }
finish(u_para); finish(u_para);
// no need to calll empty_serial_rules for(i=0;i<rule_num;i++)
{
empty_serial_rules(rule_list+i);
}
free(rule_list); free(rule_list);
rule_list=NULL; rule_list=NULL;
return; return;
@@ -965,7 +1022,13 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
{ {
ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1); ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1);
retry++; retry++;
assert(retry<5); }
if(retry>10)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module
,"Command set line id %d success after retry %d times."
, line_rule->rule_id
);
} }
return 0; return 0;
} }

View File

@@ -28,7 +28,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_0_20170701=1; int MAAT_FRAME_VERSION_2_0_20170807=1;
const char *maat_module="MAAT Frame"; const char *maat_module="MAAT Frame";
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",