增加Redis读取失败的出错处理,修复笔误导致的MAAT_UPDATE_STATUS淘汰未生效。
This commit is contained in:
@@ -48,6 +48,7 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
|
|||||||
va_start(ap,format);
|
va_start(ap,format);
|
||||||
reply = redisvCommand(c,format,ap);
|
reply = redisvCommand(c,format,ap);
|
||||||
va_end(ap);
|
va_end(ap);
|
||||||
|
|
||||||
return (redisReply *)reply;
|
return (redisReply *)reply;
|
||||||
}
|
}
|
||||||
int connect_redis_for_write(_Maat_feather_t * feather)
|
int connect_redis_for_write(_Maat_feather_t * feather)
|
||||||
@@ -391,6 +392,15 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
|
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
|
||||||
|
if(tmp_reply->type!=REDIS_REPLY_STRING)
|
||||||
|
{
|
||||||
|
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
|
||||||
|
"ZSCORE %s %s failed Version: %d->%d",rm_status_sset,reply->element[0]->str,version, version_in_redis);
|
||||||
|
free(tmp_reply);
|
||||||
|
free(reply);
|
||||||
|
goto FULL_UPDATE;
|
||||||
|
|
||||||
|
}
|
||||||
nearest_rule_version=read_redis_integer(tmp_reply);
|
nearest_rule_version=read_redis_integer(tmp_reply);
|
||||||
freeReplyObject(tmp_reply);
|
freeReplyObject(tmp_reply);
|
||||||
tmp_reply=NULL;
|
tmp_reply=NULL;
|
||||||
@@ -898,8 +908,8 @@ void cleanup_update_status(redisContext *ctx, void *logger)
|
|||||||
version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
|
version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
|
||||||
freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
|
|
||||||
//For maat_version reset, do NOT use -inf to upper bound intentionally.
|
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
|
||||||
reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",rm_expire_sset,version_lower_bound,version_upper_bound);
|
reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",rm_status_sset,version_lower_bound,version_upper_bound);
|
||||||
entry_num=read_redis_integer(reply);
|
entry_num=read_redis_integer(reply);
|
||||||
freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@
|
|||||||
#include "stream_fuzzy_hash.h"
|
#include "stream_fuzzy_hash.h"
|
||||||
#include "gram_index_engine.h"
|
#include "gram_index_engine.h"
|
||||||
|
|
||||||
int MAAT_FRAME_VERSION_2_0_20170811=1;
|
int MAAT_FRAME_VERSION_2_0_20170814_2=1;
|
||||||
const char *maat_module="MAAT Frame";
|
const char *maat_module="MAAT Frame";
|
||||||
|
|
||||||
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
||||||
@@ -1974,6 +1974,7 @@ int add_digest_rule(struct _Maat_table_info_t* table,struct db_digest_rule_t* db
|
|||||||
,db_digest_rule->confidence_degree
|
,db_digest_rule->confidence_degree
|
||||||
,group_rule);
|
,group_rule);
|
||||||
MESA_lqueue_join_tail(scanner->gie_aux[table->table_id].update_q, &digest_rule, sizeof(void*));
|
MESA_lqueue_join_tail(scanner->gie_aux[table->table_id].update_q, &digest_rule, sizeof(void*));
|
||||||
|
scanner->gie_total_q_size++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id,int rule_type,struct _Maat_scanner_t *maat_scanner,void* logger)
|
int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id,int rule_type,struct _Maat_scanner_t *maat_scanner,void* logger)
|
||||||
@@ -2026,6 +2027,7 @@ int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id,
|
|||||||
,0
|
,0
|
||||||
,NULL);
|
,NULL);
|
||||||
MESA_lqueue_join_tail(maat_scanner->gie_aux[i].update_q,&digest_rule, sizeof(void*));
|
MESA_lqueue_join_tail(maat_scanner->gie_aux[i].update_q,&digest_rule, sizeof(void*));
|
||||||
|
maat_scanner->gie_total_q_size++;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(0);
|
assert(0);
|
||||||
@@ -2925,6 +2927,7 @@ void do_scanner_update(struct _Maat_scanner_t* scanner,MESA_lqueue_head garbage_
|
|||||||
,scanner
|
,scanner
|
||||||
,i);
|
,i);
|
||||||
}
|
}
|
||||||
|
scanner->gie_total_q_size=0;
|
||||||
if(scanner->tmp_district_map!=NULL)
|
if(scanner->tmp_district_map!=NULL)
|
||||||
{
|
{
|
||||||
tmp_map=scanner->district_map;
|
tmp_map=scanner->district_map;
|
||||||
@@ -3011,6 +3014,7 @@ void maat_finish_cb(void* u_para)
|
|||||||
struct _Maat_feather_t *feather=(struct _Maat_feather_t *)u_para;
|
struct _Maat_feather_t *feather=(struct _Maat_feather_t *)u_para;
|
||||||
struct _Maat_table_info_t* p_table=NULL;
|
struct _Maat_table_info_t* p_table=NULL;
|
||||||
struct _plugin_table_info* p_table_cb=NULL;
|
struct _plugin_table_info* p_table_cb=NULL;
|
||||||
|
long expr_wait_q_cnt=0;
|
||||||
int i=0,j=0,total=0;
|
int i=0,j=0,total=0;
|
||||||
for(i=0;i<MAX_TABLE_NUM;i++)
|
for(i=0;i<MAX_TABLE_NUM;i++)
|
||||||
{
|
{
|
||||||
@@ -3052,6 +3056,8 @@ void maat_finish_cb(void* u_para)
|
|||||||
{
|
{
|
||||||
feather->scanner->cfg_num=total;
|
feather->scanner->cfg_num=total;
|
||||||
feather->scanner->version=feather->maat_version;
|
feather->scanner->version=feather->maat_version;
|
||||||
|
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
||||||
|
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size;
|
||||||
if(time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
if(time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
||||||
{
|
{
|
||||||
do_scanner_update(feather->scanner
|
do_scanner_update(feather->scanner
|
||||||
@@ -3061,6 +3067,7 @@ void maat_finish_cb(void* u_para)
|
|||||||
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module,
|
MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module,
|
||||||
"Inc config version %u build complete,%d entries in total.",
|
"Inc config version %u build complete,%d entries in total.",
|
||||||
feather->scanner->version,feather->scanner->cfg_num);
|
feather->scanner->version,feather->scanner->cfg_num);
|
||||||
|
feather->postpone_q_size=0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -3208,8 +3215,8 @@ void *thread_rule_monitor(void *arg)
|
|||||||
if(feather->scanner!=NULL)
|
if(feather->scanner!=NULL)
|
||||||
{
|
{
|
||||||
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
|
||||||
feather->postpone_q_size=expr_wait_q_cnt;
|
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size;
|
||||||
if(expr_wait_q_cnt>0&&time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
if(feather->postpone_q_size>0&&time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000)
|
||||||
{
|
{
|
||||||
do_scanner_update(feather->scanner
|
do_scanner_update(feather->scanner
|
||||||
,feather->garbage_q
|
,feather->garbage_q
|
||||||
|
|||||||
@@ -330,6 +330,7 @@ struct _Maat_scanner_t
|
|||||||
time_t last_update_time;
|
time_t last_update_time;
|
||||||
long long *ref_cnt; //optimized for cache_alignment 64
|
long long *ref_cnt; //optimized for cache_alignment 64
|
||||||
rule_scanner_t region;
|
rule_scanner_t region;
|
||||||
|
long gie_total_q_size;
|
||||||
struct GIE_aux_t gie_aux[MAX_TABLE_NUM];
|
struct GIE_aux_t gie_aux[MAX_TABLE_NUM];
|
||||||
MESA_htable_handle region_hash;
|
MESA_htable_handle region_hash;
|
||||||
MESA_htable_handle group_hash;
|
MESA_htable_handle group_hash;
|
||||||
|
|||||||
Reference in New Issue
Block a user