diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 206e19a..c595621 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -48,6 +48,7 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) va_start(ap,format); reply = redisvCommand(c,format,ap); va_end(ap); + return (redisReply *)reply; } int connect_redis_for_write(_Maat_feather_t * feather) @@ -391,6 +392,15 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** return 0; } tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); + if(tmp_reply->type!=REDIS_REPLY_STRING) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "ZSCORE %s %s failed Version: %d->%d",rm_status_sset,reply->element[0]->str,version, version_in_redis); + free(tmp_reply); + free(reply); + goto FULL_UPDATE; + + } nearest_rule_version=read_redis_integer(tmp_reply); freeReplyObject(tmp_reply); tmp_reply=NULL; @@ -898,8 +908,8 @@ void cleanup_update_status(redisContext *ctx, void *logger) version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]); freeReplyObject(reply); - //For maat_version reset, do NOT use -inf to upper bound intentionally. - reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",rm_expire_sset,version_lower_bound,version_upper_bound); + //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally. + reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",rm_status_sset,version_lower_bound,version_upper_bound); entry_num=read_redis_integer(reply); freeReplyObject(reply); diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 59c4763..ea45fee 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -28,7 +28,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_0_20170811=1; +int MAAT_FRAME_VERSION_2_0_20170814_2=1; const char *maat_module="MAAT Frame"; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", @@ -1974,6 +1974,7 @@ int add_digest_rule(struct _Maat_table_info_t* table,struct db_digest_rule_t* db ,db_digest_rule->confidence_degree ,group_rule); MESA_lqueue_join_tail(scanner->gie_aux[table->table_id].update_q, &digest_rule, sizeof(void*)); + scanner->gie_total_q_size++; return 0; } int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id,int rule_type,struct _Maat_scanner_t *maat_scanner,void* logger) @@ -2026,6 +2027,7 @@ int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id, ,0 ,NULL); MESA_lqueue_join_tail(maat_scanner->gie_aux[i].update_q,&digest_rule, sizeof(void*)); + maat_scanner->gie_total_q_size++; break; default: assert(0); @@ -2925,6 +2927,7 @@ void do_scanner_update(struct _Maat_scanner_t* scanner,MESA_lqueue_head garbage_ ,scanner ,i); } + scanner->gie_total_q_size=0; if(scanner->tmp_district_map!=NULL) { tmp_map=scanner->district_map; @@ -3011,6 +3014,7 @@ void maat_finish_cb(void* u_para) struct _Maat_feather_t *feather=(struct _Maat_feather_t *)u_para; struct _Maat_table_info_t* p_table=NULL; struct _plugin_table_info* p_table_cb=NULL; + long expr_wait_q_cnt=0; int i=0,j=0,total=0; for(i=0;iscanner->cfg_num=total; feather->scanner->version=feather->maat_version; + expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q); + feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size; if(time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000) { do_scanner_update(feather->scanner @@ -3061,6 +3067,7 @@ void maat_finish_cb(void* u_para) MESA_handle_runtime_log(feather->logger,RLOG_LV_INFO,maat_module, "Inc config version %u build complete,%d entries in total.", feather->scanner->version,feather->scanner->cfg_num); + feather->postpone_q_size=0; } else { @@ -3208,8 +3215,8 @@ void *thread_rule_monitor(void *arg) if(feather->scanner!=NULL) { expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q); - feather->postpone_q_size=expr_wait_q_cnt; - if(expr_wait_q_cnt>0&&time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000) + feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_total_q_size; + if(feather->postpone_q_size>0&&time(NULL)-feather->scanner->last_update_time>feather->effect_interval_ms/1000) { do_scanner_update(feather->scanner ,feather->garbage_q diff --git a/src/entry/Maat_rule_internal.h b/src/entry/Maat_rule_internal.h index 7ae040c..4fb4369 100644 --- a/src/entry/Maat_rule_internal.h +++ b/src/entry/Maat_rule_internal.h @@ -330,6 +330,7 @@ struct _Maat_scanner_t time_t last_update_time; long long *ref_cnt; //optimized for cache_alignment 64 rule_scanner_t region; + long gie_total_q_size; struct GIE_aux_t gie_aux[MAX_TABLE_NUM]; MESA_htable_handle region_hash; MESA_htable_handle group_hash;