diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index c595621..ba9a44c 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -16,7 +16,7 @@ const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; const char* rm_status_sset="MAAT_UPDATE_STATUS"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; const char* rm_label_sset="MAAT_LABEL_INDEX"; -const char* rm_version_sset="MAAT_VERSION_TIMER"; +const char* rm_version_sset="/AAT_VERSION_TIMER"; const static int MAAT_REDIS_SYNC_TIME=30*60; struct serial_rule_t //rm= Redis Maat @@ -45,10 +45,22 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) { va_list ap; void *reply = NULL; - va_start(ap,format); - reply = redisvCommand(c,format,ap); - va_end(ap); - + int ret=0,retry=0; + while(reply==NULL&&retry<2) + { + va_start(ap,format); + reply = redisvCommand(c,format,ap); + va_end(ap); + if(reply==NULL) + { + ret=redisReconnect(c); + retry++; + if(ret==REDIS_OK) + { + break; + } + } + } return (redisReply *)reply; } int connect_redis_for_write(_Maat_feather_t * feather) @@ -161,6 +173,9 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) case TABLE_TYPE_DIGEST: offset=6; break; + case TABLE_TYPE_SIMILARITY: + offset=5; + break; case TABLE_TYPE_EXPR_PLUS: offset=8; break; @@ -284,7 +299,7 @@ void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int ,p->digest_rule.digest_string ,p->digest_rule.confidence_degree); break; - case REGION_SIMILARITY://not support yet + case REGION_SIMILARITY: ret=snprintf(buff,size,"%d\t%d\t%s\t%hd\t1" ,p->region_id ,group_id @@ -322,6 +337,22 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_ } return; } +int _wrap_redisReconnect(redisContext* c, void*logger) +{ + int ret=0; + ret=redisReconnect(c); + if(ret==REDIS_OK) + { + + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect success."); + return 0; + } + else + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect failed."); + return -1; + } +} int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** list,void* logger, unsigned int* new_version,int *update_type) { redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL; @@ -348,15 +379,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** __redis_strerror_r(errno,err_buff,sizeof(err_buff)); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, "GET MAAT_VERSION failed %s. Reconnecting...",err_buff); - ret=redisReconnect(c); - if(ret==REDIS_OK) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect success."); - } - else - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect failed."); - } + _wrap_redisReconnect(c,logger); return 0; } version_in_redis=read_redis_integer(reply); @@ -684,7 +707,7 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } serialize_region(p_region, p_group->group_id, line, sizeof(line)); set_serial_rule(list+rule_num,MAAT_OP_ADD - ,p_region->region_id,0,p_region->table_name,line,0); + ,p_region->region_id,0,p_region->table_name,line,timeout); } else { @@ -724,6 +747,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r data_reply=_wrap_redisCommand(ctx,"MULTI"); freeReplyObject(data_reply); append_cmd_cnt=0; + assert(server_time>0); for(i=0;imap_tablename2id, line_rule->table_name, &table_id); if(ret<0) { @@ -1102,16 +1126,17 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule , line_rule->table_name); return -1; } + + server_time=redis_server_time(_feather->redis_write_ctx); if( line_rule->expire_after>0) { - absolute_expire_time=redis_server_time(_feather->redis_write_ctx); - absolute_expire_time+=line_rule->expire_after; + absolute_expire_time=server_time+line_rule->expire_after; } set_serial_rule(&s_rule, op,line_rule->rule_id,line_rule->label_id,line_rule->table_name,line_rule->table_line, absolute_expire_time); ret=0; while(!ret) { - ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,_feather->server_time); + ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,server_time); retry++; } if(retry>10) diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index ea45fee..f76885c 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -28,7 +28,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_0_20170814_2=1; +int MAAT_FRAME_VERSION_2_0_20170816=1; const char *maat_module="MAAT Frame"; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", @@ -986,20 +986,21 @@ void op_expr_add_rule(struct op_expr_t* op_expr,scan_rule_t* p_rule) op_expr->rule_type=p_rule->rule_type; return; } -GIE_digest_t* create_digest_rule(int id,short op,const char* digest, +GIE_digest_t* create_digest_rule(unsigned int id,short op,const char* digest, short cfds_lvl,struct _Maat_group_inner_t* tag) { GIE_digest_t* rule=(GIE_digest_t*)calloc(sizeof(GIE_digest_t),1); - int digest_len=strlen(digest); + int digest_len=0; rule->id=id; rule->operation=op; - rule->sfh_length=digest_len; if(digest!=NULL) { + digest_len=strlen(digest); rule->sfh=(char*)calloc(sizeof(char),digest_len+1); memcpy(rule->sfh,digest,digest_len); } + rule->sfh_length=digest_len; rule->cfds_lvl=cfds_lvl; rule->tag=(void*)tag; return rule; @@ -1459,6 +1460,7 @@ unsigned int del_region_from_group(struct _Maat_group_inner_t* group,int region_ for(j=0;jexpr_id_cnt;j++) { output_expr_id[j]=region_rule->expr_id_lb+j; + assert(output_expr_id[j]>0); } assert(j<=output_size); region_rule->region_id=0; @@ -1960,7 +1962,7 @@ int add_digest_rule(struct _Maat_table_info_t* table,struct db_digest_rule_t* db HASH_add_by_id(scanner->group_hash, db_digest_rule->group_id, group_rule); } expr_id=scanner->exprid_generator++; - u_para=add_region_to_group(group_rule,table->table_id,db_digest_rule->region_id,expr_id,district_id,TABLE_TYPE_DIGEST); + u_para=add_region_to_group(group_rule,table->table_id,db_digest_rule->region_id,district_id,expr_id,TABLE_TYPE_DIGEST); if(u_para==NULL) { return -1; @@ -2026,7 +2028,7 @@ int del_region_rule(struct _Maat_table_info_t* table,int region_id,int group_id, ,NULL ,0 ,NULL); - MESA_lqueue_join_tail(maat_scanner->gie_aux[i].update_q,&digest_rule, sizeof(void*)); + MESA_lqueue_join_tail(maat_scanner->gie_aux[table->table_id].update_q,&digest_rule, sizeof(void*)); maat_scanner->gie_total_q_size++; break; default: diff --git a/src/entry/gram_index_engine.c b/src/entry/gram_index_engine.c index 8a4576f..8f2d9cd 100644 --- a/src/entry/gram_index_engine.c +++ b/src/entry/gram_index_engine.c @@ -9,7 +9,7 @@ #include "gram_index_engine.h" #include "queue.h" -#define HTABLE_SIZE 32*1024 +#define HTABLE_SIZE 128*1024 #define GRAM_CNT_MAX 2 #define GRAM_MAX 128 #define TOLERENCE_SIZE 0