增加Redis重启、不可用场景下的错误处理,已在线部署一个局点。

This commit is contained in:
zhengchao
2017-08-12 11:13:47 +08:00
parent 448a712a20
commit 94f69b0f6d
4 changed files with 202 additions and 175 deletions

View File

@@ -619,21 +619,23 @@ int Maat_initiate_feather(Maat_feather_t feather)
{ {
_feather->REDIS_MODE_ON=1; _feather->REDIS_MODE_ON=1;
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module , MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
"Maat initiate from Redis %s:%hu" "Maat initiate from Redis %s:%hu db%d."
,_feather->redis_ip ,_feather->redis_ip
,_feather->redis_port); ,_feather->redis_port
,_feather->redis_index);
_feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,_feather->connect_timeout); _feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,_feather->connect_timeout);
if(_feather->redis_read_ctx==NULL) if(_feather->redis_read_ctx==NULL)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Redis connect %s:%d failed." ,"Redis connect %s:%d failed."
,_feather->redis_ip,_feather->redis_port); ,_feather->redis_ip,_feather->redis_port);
return -1;
} }
redisEnableKeepAlive(_feather->redis_read_ctx); else
reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index); {
freeReplyObject(reply); redisEnableKeepAlive(_feather->redis_read_ctx);
redis_monitor_traverse(_feather->maat_version reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index);
freeReplyObject(reply);
redis_monitor_traverse(_feather->maat_version
,_feather->redis_read_ctx ,_feather->redis_read_ctx
,maat_start_cb ,maat_start_cb
,maat_update_cb ,maat_update_cb
@@ -641,6 +643,7 @@ int Maat_initiate_feather(Maat_feather_t feather)
, _feather , _feather
,_feather->decrypt_key //Not used. ,_feather->decrypt_key //Not used.
,_feather); ,_feather);
}
} }
else else
{ {

View File

@@ -14,7 +14,7 @@ const char* maat_redis_command="MAAT_REDIS_COMMAND";
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* rm_status_sset="MAAT_UPDATE_STATUS"; const char* rm_status_sset="MAAT_UPDATE_STATUS";
const char* rm_expire_sset="MAAT_RULE_TIMER"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
const char* rm_label_sset="MAAT_LABEL_INDEX"; const char* rm_label_sset="MAAT_LABEL_INDEX";
const char* rm_version_sset="MAAT_VERSION_TIMER"; const char* rm_version_sset="MAAT_VERSION_TIMER";
const static int MAAT_REDIS_SYNC_TIME=30*60; const static int MAAT_REDIS_SYNC_TIME=30*60;
@@ -43,12 +43,12 @@ int _wrap_redisGetReply(redisContext *c, redisReply **reply)
} }
redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
{ {
va_list ap; va_list ap;
void *reply = NULL; void *reply = NULL;
va_start(ap,format); va_start(ap,format);
reply = redisvCommand(c,format,ap); reply = redisvCommand(c,format,ap);
va_end(ap); va_end(ap);
return (redisReply *)reply; return (redisReply *)reply;
} }
int connect_redis_for_write(_Maat_feather_t * feather) int connect_redis_for_write(_Maat_feather_t * feather)
{ {
@@ -59,8 +59,8 @@ int connect_redis_for_write(_Maat_feather_t * feather)
if(feather->redis_write_ctx==NULL) if(feather->redis_write_ctx==NULL)
{ {
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Redis connect %s:%d for write failed." ,"Redis connect %s:%d for write failed."
,feather->redis_ip,feather->redis_port); ,feather->redis_ip,feather->redis_port);
ret=-1; ret=-1;
} }
else else
@@ -190,35 +190,35 @@ int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long lo
{ {
int append_cmd_cnt=0; int append_cmd_cnt=0;
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d" redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d"
,rm_key_prefix[MAAT_OP_ADD] ,rm_key_prefix[MAAT_OP_ADD]
,s_rule->table_name ,s_rule->table_name
,s_rule->rule_id ,s_rule->rule_id
,rm_key_prefix[MAAT_OP_DEL] ,rm_key_prefix[MAAT_OP_DEL]
,s_rule->table_name ,s_rule->table_name
,s_rule->rule_id ,s_rule->rule_id
); );
append_cmd_cnt++; append_cmd_cnt++;
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL] redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL]
,s_rule->table_name ,s_rule->table_name
,s_rule->rule_id ,s_rule->rule_id
,MAAT_REDIS_SYNC_TIME); ,MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++; append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements. //NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset
,new_version ,new_version
,s_rule->table_name ,s_rule->table_name
,s_rule->rule_id); ,s_rule->rule_id);
append_cmd_cnt++; append_cmd_cnt++;
// Try to remove from expiration sorted set, no matter wheather it exists or not. // Try to remove from expiration sorted set, no matter wheather it exists or not.
redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset
,s_rule->table_name ,s_rule->table_name
,s_rule->rule_id); ,s_rule->rule_id);
append_cmd_cnt++; append_cmd_cnt++;
if(s_rule->label_id>0) if(s_rule->label_id>0)
{ {
redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset
,s_rule->rule_id); ,s_rule->rule_id);
append_cmd_cnt++; append_cmd_cnt++;
} }
@@ -231,64 +231,64 @@ void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int
{ {
case REGION_IP: case REGION_IP:
ret=snprintf(buff,size,"%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1" ret=snprintf(buff,size,"%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->ip_rule.addr_type ,p->ip_rule.addr_type
,p->ip_rule.src_ip ,p->ip_rule.src_ip
,p->ip_rule.mask_src_ip ,p->ip_rule.mask_src_ip
,p->ip_rule.src_port ,p->ip_rule.src_port
,p->ip_rule.mask_src_port ,p->ip_rule.mask_src_port
,p->ip_rule.dst_ip ,p->ip_rule.dst_ip
,p->ip_rule.mask_dst_ip ,p->ip_rule.mask_dst_ip
,p->ip_rule.dst_port ,p->ip_rule.dst_port
,p->ip_rule.mask_dst_port ,p->ip_rule.mask_dst_port
,p->ip_rule.protocol ,p->ip_rule.protocol
,p->ip_rule.direction); ,p->ip_rule.direction);
break; break;
case REGION_EXPR: case REGION_EXPR:
if(p->expr_rule.district==NULL) if(p->expr_rule.district==NULL)
{ {
ret=snprintf(buff,size,"%d\t%d\t%s\t%d\t%d\t%d\t1" ret=snprintf(buff,size,"%d\t%d\t%s\t%d\t%d\t%d\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->expr_rule.keywords ,p->expr_rule.keywords
,p->expr_rule.expr_type ,p->expr_rule.expr_type
,p->expr_rule.match_method ,p->expr_rule.match_method
,p->expr_rule.hex_bin); ,p->expr_rule.hex_bin);
} }
else //expr_plus else //expr_plus
{ {
ret=snprintf(buff,size,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1" ret=snprintf(buff,size,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->expr_rule.keywords ,p->expr_rule.keywords
,p->expr_rule.district ,p->expr_rule.district
,p->expr_rule.expr_type ,p->expr_rule.expr_type
,p->expr_rule.match_method ,p->expr_rule.match_method
,p->expr_rule.hex_bin); ,p->expr_rule.hex_bin);
} }
break; break;
case REGION_INTERVAL: case REGION_INTERVAL:
ret=snprintf(buff,size,"%d\t%d\t%u\t%u\t1" ret=snprintf(buff,size,"%d\t%d\t%u\t%u\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->interval_rule.low_boundary ,p->interval_rule.low_boundary
,p->interval_rule.up_boundary); ,p->interval_rule.up_boundary);
break; break;
case REGION_DIGEST: case REGION_DIGEST:
ret=snprintf(buff,size,"%d\t%d\t%llu\t%s\t%hd\t1" ret=snprintf(buff,size,"%d\t%d\t%llu\t%s\t%hd\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->digest_rule.orgin_len ,p->digest_rule.orgin_len
,p->digest_rule.digest_string ,p->digest_rule.digest_string
,p->digest_rule.confidence_degree); ,p->digest_rule.confidence_degree);
break; break;
case REGION_SIMILARITY://not support yet case REGION_SIMILARITY://not support yet
ret=snprintf(buff,size,"%d\t%d\t%s\t%hd\t1" ret=snprintf(buff,size,"%d\t%d\t%s\t%hd\t1"
,p->region_id ,p->region_id
,group_id ,group_id
,p->similarity_rule.target ,p->similarity_rule.target
,p->similarity_rule.threshold); ,p->similarity_rule.threshold);
break; break;
default: default:
assert(0); assert(0);
@@ -327,34 +327,36 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
char err_buff[256]; char err_buff[256];
char op_str[4]; char op_str[4];
long long version_in_redis=0,nearest_rule_version=0; long long version_in_redis=0,nearest_rule_version=0;
int ret=0,retry=0; int ret=0;
unsigned int i=0,full_idx =0,append_cmd_cnt=0; unsigned int i=0,full_idx =0,append_cmd_cnt=0;
struct serial_rule_t *s_rule=NULL; struct serial_rule_t *s_rule=NULL;
reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION");
while(retry<1) if(reply!=NULL)
{ {
reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION");
if(reply!=NULL) if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR)
{ {
break; MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy.");
return 0;
} }
if(c->err==REDIS_ERR_EOF) }
else
{
memset(err_buff,0,sizeof(err_buff));
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff);
ret=redisReconnect(c);
if(ret==REDIS_OK)
{ {
__redis_strerror_r(errno,err_buff,sizeof(err_buff)); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect success.");
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff);
ret=redisReconnect(c);
retry++;
continue;
} }
else else
{ {
__redis_strerror_r(errno,err_buff,sizeof(err_buff)); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect failed.");
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, }
"GET MAAT_VERSION failed %s.",err_buff); return 0;
return 0;
}
} }
version_in_redis=read_redis_integer(reply); version_in_redis=read_redis_integer(reply);
assert(version_in_redis>=version); assert(version_in_redis>=version);
@@ -364,13 +366,13 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
return 0; return 0;
} }
*new_version=version_in_redis; *new_version=version_in_redis;
if(version==0) if(version==0)
{ {
goto FULL_UPDATE; goto FULL_UPDATE;
} }
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version). //The elements are considered to be ordered from low to high scores(version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis); reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis);
@@ -378,10 +380,16 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
{ {
__redis_strerror_r(errno,err_buff,sizeof(err_buff)); __redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET %s failed %s.",rm_status_sset,err_buff); "GET %s failed %s.",rm_status_sset,err_buff);
return 0; return 0;
} }
assert(reply->type==REDIS_REPLY_ARRAY); assert(reply->type==REDIS_REPLY_ARRAY);
if(reply->elements==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis);
freeReplyObject(reply);
return 0;
}
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
nearest_rule_version=read_redis_integer(tmp_reply); nearest_rule_version=read_redis_integer(tmp_reply);
freeReplyObject(tmp_reply); freeReplyObject(tmp_reply);
@@ -389,13 +397,13 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(nearest_rule_version!=version+1) if(nearest_rule_version!=version+1)
{ {
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Noncontinuous VERSION Redis: %lld MAAT: %d.",nearest_rule_version,version); "Noncontinuous VERSION Redis: %lld MAAT: %d.",nearest_rule_version,version);
goto FULL_UPDATE; goto FULL_UPDATE;
} }
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form version %d to %lld.",version,version_in_redis); "Inc Update form version %d to %lld (%lld entries).",version,version_in_redis,reply->elements);
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++) for(i=0;i<reply->elements;i++)
{ {
@@ -421,7 +429,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
return i; return i;
FULL_UPDATE: FULL_UPDATE:
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Initiate full udpate from version %d to %lld.",version,version_in_redis); "Initiate full udpate from version %d to %lld.",version,version_in_redis);
append_cmd_cnt=0; append_cmd_cnt=0;
ret=redisAppendCommand(c, "MULTI"); ret=redisAppendCommand(c, "MULTI");
append_cmd_cnt++; append_cmd_cnt++;
@@ -452,10 +460,10 @@ FULL_UPDATE:
freeReplyObject(reply); freeReplyObject(reply);
*list=s_rule; *list=s_rule;
*update_type=CM_UPDATE_TYPE_FULL; *update_type=CM_UPDATE_TYPE_FULL;
return full_idx ; return full_idx ;
} }
void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger) int get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
{ {
int i=0,ret=0,failed_cnt=0,idx=0; int i=0,ret=0,failed_cnt=0,idx=0;
int *retry_ids=(int*)malloc(sizeof(int)*rule_num); int *retry_ids=(int*)malloc(sizeof(int)*rule_num);
@@ -464,8 +472,8 @@ void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,v
for(i=0;i<rule_num;i++) for(i=0;i<rule_num;i++)
{ {
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[rule_list[i].op] snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[rule_list[i].op]
,rule_list[i].table_name ,rule_list[i].table_name
,rule_list[i].rule_id); ,rule_list[i].rule_id);
ret=redisAppendCommand(c, redis_cmd); ret=redisAppendCommand(c, redis_cmd);
assert(ret==REDIS_OK); assert(ret==REDIS_OK);
} }
@@ -478,9 +486,20 @@ void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,v
} }
else else
{ {
assert(reply->type==REDIS_REPLY_NIL); if(reply->type==REDIS_REPLY_NIL)
retry_ids[failed_cnt]=i; {
failed_cnt++; retry_ids[failed_cnt]=i;
failed_cnt++;
}
else
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Redis GET %s:%s,%d failed",rm_key_prefix[rule_list[i].op]
,rule_list[i].table_name
,rule_list[i].rule_id);
free(retry_ids);
return -1;
}
} }
freeReplyObject(reply); freeReplyObject(reply);
} }
@@ -488,15 +507,15 @@ void get_rm_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,v
{ {
idx=retry_ids[i]; idx=retry_ids[i];
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[0] snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[0]
,rule_list[idx].table_name ,rule_list[idx].table_name
,rule_list[idx].rule_id); ,rule_list[idx].rule_id);
reply=_wrap_redisCommand(c, redis_cmd); reply=_wrap_redisCommand(c, redis_cmd);
assert(reply->type==REDIS_REPLY_STRING); assert(reply->type==REDIS_REPLY_STRING);
rule_list[idx].table_line=_maat_strdup(reply->str); rule_list[idx].table_line=_maat_strdup(reply->str);
freeReplyObject(reply); freeReplyObject(reply);
} }
free(retry_ids); free(retry_ids);
return; return 0;
} }
int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt) int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt)
{ {
@@ -533,20 +552,20 @@ int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _
struct _Maat_group_inner_t* group_inner=NULL; struct _Maat_group_inner_t* group_inner=NULL;
struct _Maat_region_inner_t* region_inner=NULL; struct _Maat_region_inner_t* region_inner=NULL;
void* logger=feather->logger; void* logger=feather->logger;
int config_id=cmd->compile.config_id; int config_id=cmd->compile.config_id;
if(feather->scanner==NULL) if(feather->scanner==NULL)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command
,"MAAT not ready."); ,"MAAT not ready.");
return -1; return -1;
} }
compile_inner=(struct _Maat_compile_inner_t *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id); compile_inner=(struct _Maat_compile_inner_t *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id);
if(compile_inner==NULL) if(compile_inner==NULL)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_command
,"config %d not exist." ,"config %d not exist."
,config_id); ,config_id);
return -1; return -1;
} }
cmd->group_num=compile_inner->group_cnt; cmd->group_num=compile_inner->group_cnt;
@@ -561,7 +580,7 @@ int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _
} }
group_cmd=&(cmd->groups[grp_idx]); group_cmd=&(cmd->groups[grp_idx]);
group_cmd->group_id=group_inner->group_id; group_cmd->group_id=group_inner->group_id;
if(group_inner->ref_cnt>0) if(group_inner->ref_cnt>0)
{ {
continue; continue;
@@ -606,12 +625,12 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
if(op==MAAT_OP_ADD) if(op==MAAT_OP_ADD)
{ {
snprintf(line,sizeof(line),"%d\t%d\t%hhd\t%hhd\t%hhd\t0\t%s\t1\t%d",p_m_rule->config_id snprintf(line,sizeof(line),"%d\t%d\t%hhd\t%hhd\t%hhd\t0\t%s\t1\t%d",p_m_rule->config_id
,p_m_rule->service_id ,p_m_rule->service_id
,p_m_rule->action ,p_m_rule->action
,p_m_rule->do_blacklist ,p_m_rule->do_blacklist
,p_m_rule->do_log ,p_m_rule->do_log
,p_m_rule->service_defined ,p_m_rule->service_defined
,cmd->group_num); ,cmd->group_num);
set_serial_rule(list+rule_num,MAAT_OP_ADD,cmd->compile.config_id,cmd->label_id,feather->compile_tn,line,timeout); set_serial_rule(list+rule_num,MAAT_OP_ADD,cmd->compile.config_id,cmd->label_id,feather->compile_tn,line,timeout);
} }
@@ -631,7 +650,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
feather->base_grp_seq++; feather->base_grp_seq++;
} }
snprintf(line,sizeof(line),"%d\t%d\t1",p_group->group_id snprintf(line,sizeof(line),"%d\t%d\t1",p_group->group_id
,p_m_rule->config_id); ,p_m_rule->config_id);
set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line,timeout); set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line,timeout);
} }
else else
@@ -655,12 +674,12 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
} }
serialize_region(p_region, p_group->group_id, line, sizeof(line)); serialize_region(p_region, p_group->group_id, line, sizeof(line));
set_serial_rule(list+rule_num,MAAT_OP_ADD set_serial_rule(list+rule_num,MAAT_OP_ADD
,p_region->region_id,0,p_region->table_name,line,0); ,p_region->region_id,0,p_region->table_name,line,0);
} }
else else
{ {
set_serial_rule(list+rule_num,MAAT_OP_DEL set_serial_rule(list+rule_num,MAAT_OP_DEL
,p_region->region_id,0,p_region->table_name,NULL,0); ,p_region->region_id,0,p_region->table_name,NULL,0);
} }
rule_num++; rule_num++;
@@ -700,29 +719,29 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
if(s_rule[i].op==MAAT_OP_ADD) if(s_rule[i].op==MAAT_OP_ADD)
{ {
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD] redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id ,s_rule[i].rule_id
,s_rule[i].table_line); ,s_rule[i].table_line);
append_cmd_cnt++; append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements. //NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,maat_redis_version ,maat_redis_version
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id); ,s_rule[i].rule_id);
append_cmd_cnt++; append_cmd_cnt++;
if(s_rule[i].timeout>0) if(s_rule[i].timeout>0)
{ {
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
,s_rule[i].timeout ,s_rule[i].timeout
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id); ,s_rule[i].rule_id);
append_cmd_cnt++; append_cmd_cnt++;
} }
if(s_rule[i].label_id>0) if(s_rule[i].label_id>0)
{ {
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
,s_rule[i].label_id ,s_rule[i].label_id
,s_rule[i].rule_id); ,s_rule[i].rule_id);
append_cmd_cnt++; append_cmd_cnt++;
} }
} }
@@ -758,7 +777,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
struct Maat_group_t* p_group=NULL; struct Maat_group_t* p_group=NULL;
struct Maat_region_t* p_region=NULL; struct Maat_region_t* p_region=NULL;
enum MAAT_TABLE_TYPE table_type; enum MAAT_TABLE_TYPE table_type;
struct _Maat_compile_inner_t *compile_rule=NULL; struct _Maat_compile_inner_t *compile_rule=NULL;
if(feather->scanner!=NULL) if(feather->scanner!=NULL)
{ {
@@ -766,7 +785,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
if(compile_rule!=NULL) if(compile_rule!=NULL)
{ {
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Maat rule %d already exisits.",cmd->compile.config_id); ,"Maat rule %d already exisits.",cmd->compile.config_id);
return -1; return -1;
} }
} }
@@ -781,8 +800,8 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
if(ret<0) if(ret<0)
{ {
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Unknown table %s of Maat_cmd_t[%d]->group[%d]->region[%d]." ,"Unknown table %s of Maat_cmd_t[%d]->group[%d]->region[%d]."
,table_name,cmd->compile.config_id,i,j); ,table_name,cmd->compile.config_id,i,j);
return -1; return -1;
} }
@@ -790,10 +809,10 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
if(table_type!=feather->p_table_info[table_id]->table_type) if(table_type!=feather->p_table_info[table_id]->table_type)
{ {
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Table %s not support region type %d of Maat_cmd_t[%d]->group[%d]->region[%d]." ,"Table %s not support region type %d of Maat_cmd_t[%d]->group[%d]->region[%d]."
,table_name ,table_name
,p_region->region_type ,p_region->region_type
,cmd->compile.config_id,i,j); ,cmd->compile.config_id,i,j);
return -1; return -1;
} }
free((char*)p_region->table_name); free((char*)p_region->table_name);
@@ -810,7 +829,7 @@ void check_maat_expiration(redisContext *ctx, void *logger)
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL; struct serial_rule_t* s_rule=NULL;
long long server_time=0; long long server_time=0;
server_time=redis_server_time(ctx); server_time=redis_server_time(ctx);
data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time); data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time);
@@ -829,18 +848,18 @@ void check_maat_expiration(redisContext *ctx, void *logger)
} }
freeReplyObject(data_reply); freeReplyObject(data_reply);
is_success=exec_serial_rule(ctx,s_rule, s_rule_num,server_time); is_success=exec_serial_rule(ctx,s_rule, s_rule_num,server_time);
if(is_success==1) if(is_success==1)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Succesfully expried %d rules in Redis.", s_rule_num); ,"Succesfully expired %d rules in Redis.", s_rule_num);
} }
else else
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Failed to expried %d rules in Redis.", s_rule_num); ,"Failed to expired %d rules in Redis, try later.", s_rule_num);
} }
free(s_rule); free(s_rule);
return; return;
} }
@@ -851,7 +870,7 @@ void cleanup_update_status(redisContext *ctx, void *logger)
long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0; long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0;
server_time=redis_server_time(ctx); server_time=redis_server_time(ctx);
reply=_wrap_redisCommand(ctx,"MULTI"); reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(reply); freeReplyObject(reply);
redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME); redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
@@ -885,20 +904,20 @@ void cleanup_update_status(redisContext *ctx, void *logger)
freeReplyObject(reply); freeReplyObject(reply);
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Clean up updaste status from version %lld to %lld (%lld versions, %lld entries)." ,"Clean up update status from version %lld to %lld (%lld versions, %lld entries)."
,version_lower_bound ,version_lower_bound
,version_upper_bound ,version_upper_bound
,version_num ,version_num
,entry_num); ,entry_num);
} }
void redis_monitor_traverse(unsigned int version,redisContext *c, void redis_monitor_traverse(unsigned int version,redisContext *c,
void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para void (*finish)(void*),//u_para
void* u_para, void* u_para,
const unsigned char* dec_key, const unsigned char* dec_key,
_Maat_feather_t* feather) _Maat_feather_t* feather)
{ {
unsigned int rule_num=0,i=0; unsigned int rule_num=0,i=0;
int table_id=0; int table_id=0;
@@ -919,8 +938,12 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
{ {
return; return;
} }
get_rm_value(c,rule_list,rule_num, logger); ret=get_rm_value(c,rule_list,rule_num, logger);
if(ret<0)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
goto clean_up;
}
start(new_version,update_type,u_para); start(new_version,update_type,u_para);
for(i=0;i<rule_num;i++) for(i=0;i<rule_num;i++)
{ {
@@ -937,6 +960,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
update(rule_list[i].table_name,rule_list[i].table_line,u_para); update(rule_list[i].table_name,rule_list[i].table_line,u_para);
} }
finish(u_para); finish(u_para);
clean_up:
for(i=0;i<rule_num;i++) for(i=0;i<rule_num;i++)
{ {
empty_serial_rules(rule_list+i); empty_serial_rules(rule_list+i);
@@ -1054,18 +1078,18 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
if(ret<0) if(ret<0)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Command set line id %d failed: unknown table %s." ,"Command set line id %d failed: unknown table %s."
, line_rule->rule_id , line_rule->rule_id
, line_rule->table_name); , line_rule->table_name);
return -1; return -1;
} }
if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type) if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Command set line id %d failed: table %s is not a plugin table." ,"Command set line id %d failed: table %s is not a plugin table."
, line_rule->rule_id , line_rule->rule_id
, line_rule->table_name); , line_rule->table_name);
return -1; return -1;
} }
if( line_rule->expire_after>0) if( line_rule->expire_after>0)
@@ -1083,9 +1107,9 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
if(retry>10) if(retry>10)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module
,"Command set line id %d success after retry %d times." ,"Command set line id %d success after retry %d times."
, line_rule->rule_id , line_rule->rule_id
); );
} }
return 0; return 0;
} }
@@ -1229,7 +1253,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
assert(data_reply->type==REDIS_REPLY_INTEGER); assert(data_reply->type==REDIS_REPLY_INTEGER);
_feather->base_rgn_seq=data_reply->integer-new_region_num; _feather->base_rgn_seq=data_reply->integer-new_region_num;
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num);
assert(data_reply->type==REDIS_REPLY_INTEGER); assert(data_reply->type==REDIS_REPLY_INTEGER);
_feather->base_grp_seq=data_reply->integer-new_group_num; _feather->base_grp_seq=data_reply->integer-new_group_num;
@@ -1264,7 +1288,7 @@ error_out:
} }
_feather->cmd_qhead=_feather->cmd_qtail=NULL; _feather->cmd_qhead=_feather->cmd_qtail=NULL;
_feather->cmd_q_cnt=0; _feather->cmd_q_cnt=0;
for(i=0;i<serial_rule_num;i++) for(i=0;i<serial_rule_num;i++)
{ {
empty_serial_rules(s_rule+i); empty_serial_rules(s_rule+i);
@@ -1307,9 +1331,9 @@ int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsi
} }
} }
data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d" data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d"
,rm_label_sset ,rm_label_sset
,label_id ,label_id
,label_id); ,label_id);
for(i=0;i<data_reply->elements&&i<size;i++) for(i=0;i<data_reply->elements&&i<size;i++)
{ {
output_ids[i]=atoi(data_reply->element[i]->str); output_ids[i]=atoi(data_reply->element[i]->str);

View File

@@ -28,7 +28,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_0_20170810=1; int MAAT_FRAME_VERSION_2_0_20170811=1;
const char *maat_module="MAAT Frame"; const char *maat_module="MAAT Frame";
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
@@ -3052,7 +3052,7 @@ void maat_finish_cb(void* u_para)
{ {
feather->scanner->cfg_num=total; feather->scanner->cfg_num=total;
feather->scanner->version=feather->maat_version; feather->scanner->version=feather->maat_version;
if(time(NULL)-feather->scanner->last_update_time>60) if(time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
{ {
do_scanner_update(feather->scanner do_scanner_update(feather->scanner
,feather->garbage_q ,feather->garbage_q

View File

@@ -4,7 +4,7 @@ port="6379"
echo "Reseting Redis For Maat..." echo "Reseting Redis For Maat..."
redis-cli -h $host -p $port GET MAAT_VERSION redis-cli -h $host -p $port GET MAAT_VERSION
redis-cli -h $host -p $port FLUSHALL redis-cli -h $host -p $port FLUSHALL
redis-cli -h $host -p $port SET MAAT_VERSION "1" redis-cli -h $host -p $port SET MAAT_VERSION "0"
redis-cli -h $host -p $port SET SEQUENCE_REGION "1" redis-cli -h $host -p $port SET SEQUENCE_REGION "1"
redis-cli -h $host -p $port SET SEQUENCE_GROUP "1" redis-cli -h $host -p $port SET SEQUENCE_GROUP "1"