diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index f87b680..ec14be0 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -17,6 +17,7 @@ const char* rm_status_sset="MAAT_UPDATE_STATUS"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; const char* rm_label_sset="MAAT_LABEL_INDEX"; const char* rm_version_sset="MAAT_VERSION_TIMER"; +const char* rm_expire_lock="EXPIRE_OP_LOCK"; const static int MAAT_REDIS_SYNC_TIME=30*60; @@ -552,7 +553,12 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[MAAT_OP_DEL] ,rule_list[idx].table_name ,rule_list[idx].rule_id); - reply=_wrap_redisCommand(c, redis_cmd); + ret=redisAppendCommand(c, redis_cmd); + } + for(i=0;itype==REDIS_REPLY_STRING) { rule_list[idx].table_line=_maat_strdup(reply->str); @@ -561,9 +567,6 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru { MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor ,"Redis cmd=%s Error, Reply type=%d, str=%s",redis_cmd, reply->type, reply->str); - freeReplyObject(reply); - free(retry_ids); - return -1; } else //Handle type "nil" { @@ -1046,7 +1049,7 @@ void check_maat_expiration(redisContext *ctx, void *logger) else { MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor - ,"Failed to expired %d/%d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num); + ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num); } free(s_rule); @@ -1099,6 +1102,29 @@ void cleanup_update_status(redisContext *ctx, void *logger) ,version_num ,entry_num); +} +int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire) +{ + redisReply* reply=NULL; + int ret=0; + reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire); + if(reply->type==REDIS_REPLY_NIL) + { + ret=0; + } + else + { + ret=1; + } + freeReplyObject(reply); + return ret; +} +void redlock_unlock(redisContext * ctx, const char * lock_name) +{ + redisReply* reply=NULL; + reply=_wrap_redisCommand(ctx,"DEL %s", lock_name); + freeReplyObject(reply); + } void redis_monitor_traverse(long long version,redisContext *c, void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para @@ -1118,8 +1144,12 @@ void redis_monitor_traverse(long long version,redisContext *c, if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write { //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. - check_maat_expiration(feather->redis_read_ctx, logger); - cleanup_update_status(feather->redis_read_ctx, logger); + if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, 300*1000)) + { + check_maat_expiration(feather->redis_read_ctx, logger); + cleanup_update_status(feather->redis_read_ctx, logger); + redlock_unlock(feather->redis_read_ctx,rm_expire_lock); + } } if(c==NULL||c->err) { @@ -1362,6 +1392,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru } } + _feather->line_cmd_acc_num+=success_cnt; if(retry>5) { MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 9e1a326..e1d1a41 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -30,7 +30,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_1_20180523=1; +int MAAT_FRAME_VERSION_2_2_20180524=1; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; diff --git a/src/entry/Maat_rule_internal.h b/src/entry/Maat_rule_internal.h index 9b1f41d..8271abb 100644 --- a/src/entry/Maat_rule_internal.h +++ b/src/entry/Maat_rule_internal.h @@ -418,6 +418,7 @@ struct _Maat_feather_t long long postpone_q_size; long long compile_rule_num; long long cmd_acc_num; + long long line_cmd_acc_num; }; struct _maat_garbage_t { diff --git a/src/entry/Maat_stat.cpp b/src/entry/Maat_stat.cpp index b6ff57b..6658989 100644 --- a/src/entry/Maat_stat.cpp +++ b/src/entry/Maat_stat.cpp @@ -23,7 +23,8 @@ enum MAAT_FS_STATUS{ STATUS_ORPHAN_GROUP_SAVING, STATUS_LAST_REGION_SAVING, STATUS_CMD_NUM, - STATUS_CMD_Q_SIZE + STATUS_CMD_Q_SIZE, + STATUS_CMD_LINE_NUM }; enum MAAT_FS_COLUMN @@ -78,6 +79,7 @@ void maat_stat_init(struct _Maat_feather_t* feather) feather->fs_status_id[STATUS_SCAN_ERR_CNT]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"scan_error"); feather->fs_status_id[STATUS_CMD_NUM]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_commit"); feather->fs_status_id[STATUS_CMD_Q_SIZE]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_in_q"); + feather->fs_status_id[STATUS_CMD_LINE_NUM]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_SPEED,"line_cmd/s"); feather->fs_column_id[COLUMN_TABLE_RULE_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"rule"); feather->fs_column_id[COLUMN_TABLE_REGEX_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"regex"); @@ -184,6 +186,7 @@ void maat_stat_output(struct _Maat_feather_t* feather) FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_LAST_REGION_SAVING], 0,FS_OP_SET,last_region_saving); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_NUM], 0,FS_OP_SET,feather->cmd_acc_num); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_Q_SIZE], 0,FS_OP_SET,feather->cmd_q_cnt); + FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_LINE_NUM], 0,FS_OP_SET,feather->line_cmd_acc_num); value=MESA_lqueue_get_count(feather->garbage_q); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_GARBAGE_QSIZE], 0,FS_OP_SET,value);