1、使用分布式锁选举进行淘汰操作的写入者;2、提高obsolete状态的value读取速度;

This commit is contained in:
zhengchao
2018-05-24 15:45:46 +08:00
parent 3e07461a20
commit e709300b24
4 changed files with 44 additions and 9 deletions

View File

@@ -17,6 +17,7 @@ const char* rm_status_sset="MAAT_UPDATE_STATUS";
const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
const char* rm_label_sset="MAAT_LABEL_INDEX"; const char* rm_label_sset="MAAT_LABEL_INDEX";
const char* rm_version_sset="MAAT_VERSION_TIMER"; const char* rm_version_sset="MAAT_VERSION_TIMER";
const char* rm_expire_lock="EXPIRE_OP_LOCK";
const static int MAAT_REDIS_SYNC_TIME=30*60; const static int MAAT_REDIS_SYNC_TIME=30*60;
@@ -552,7 +553,12 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[MAAT_OP_DEL] snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%d",rm_key_prefix[MAAT_OP_DEL]
,rule_list[idx].table_name ,rule_list[idx].table_name
,rule_list[idx].rule_id); ,rule_list[idx].rule_id);
reply=_wrap_redisCommand(c, redis_cmd); ret=redisAppendCommand(c, redis_cmd);
}
for(i=0;i<failed_cnt;i++)
{
idx=retry_ids[i];
ret=_wrap_redisGetReply(c,&reply);
if(reply->type==REDIS_REPLY_STRING) if(reply->type==REDIS_REPLY_STRING)
{ {
rule_list[idx].table_line=_maat_strdup(reply->str); rule_list[idx].table_line=_maat_strdup(reply->str);
@@ -561,9 +567,6 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru
{ {
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor
,"Redis cmd=%s Error, Reply type=%d, str=%s",redis_cmd, reply->type, reply->str); ,"Redis cmd=%s Error, Reply type=%d, str=%s",redis_cmd, reply->type, reply->str);
freeReplyObject(reply);
free(retry_ids);
return -1;
} }
else //Handle type "nil" else //Handle type "nil"
{ {
@@ -1046,7 +1049,7 @@ void check_maat_expiration(redisContext *ctx, void *logger)
else else
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Failed to expired %d/%d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num); ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num);
} }
free(s_rule); free(s_rule);
@@ -1099,6 +1102,29 @@ void cleanup_update_status(redisContext *ctx, void *logger)
,version_num ,version_num
,entry_num); ,entry_num);
}
int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
{
redisReply* reply=NULL;
int ret=0;
reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire);
if(reply->type==REDIS_REPLY_NIL)
{
ret=0;
}
else
{
ret=1;
}
freeReplyObject(reply);
return ret;
}
void redlock_unlock(redisContext * ctx, const char * lock_name)
{
redisReply* reply=NULL;
reply=_wrap_redisCommand(ctx,"DEL %s", lock_name);
freeReplyObject(reply);
} }
void redis_monitor_traverse(long long version,redisContext *c, void redis_monitor_traverse(long long version,redisContext *c,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
@@ -1118,8 +1144,12 @@ void redis_monitor_traverse(long long version,redisContext *c,
if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write
{ {
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, 300*1000))
{
check_maat_expiration(feather->redis_read_ctx, logger); check_maat_expiration(feather->redis_read_ctx, logger);
cleanup_update_status(feather->redis_read_ctx, logger); cleanup_update_status(feather->redis_read_ctx, logger);
redlock_unlock(feather->redis_read_ctx,rm_expire_lock);
}
} }
if(c==NULL||c->err) if(c==NULL||c->err)
{ {
@@ -1362,6 +1392,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
break; break;
} }
}
_feather->line_cmd_acc_num+=success_cnt; _feather->line_cmd_acc_num+=success_cnt;
if(retry>5) if(retry>5)
{ {

View File

@@ -30,7 +30,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_1_20180523=1; int MAAT_FRAME_VERSION_2_2_20180524=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};

View File

@@ -418,6 +418,7 @@ struct _Maat_feather_t
long long postpone_q_size; long long postpone_q_size;
long long compile_rule_num; long long compile_rule_num;
long long cmd_acc_num; long long cmd_acc_num;
long long line_cmd_acc_num;
}; };
struct _maat_garbage_t struct _maat_garbage_t
{ {

View File

@@ -23,7 +23,8 @@ enum MAAT_FS_STATUS{
STATUS_ORPHAN_GROUP_SAVING, STATUS_ORPHAN_GROUP_SAVING,
STATUS_LAST_REGION_SAVING, STATUS_LAST_REGION_SAVING,
STATUS_CMD_NUM, STATUS_CMD_NUM,
STATUS_CMD_Q_SIZE STATUS_CMD_Q_SIZE,
STATUS_CMD_LINE_NUM
}; };
enum MAAT_FS_COLUMN enum MAAT_FS_COLUMN
@@ -78,6 +79,7 @@ void maat_stat_init(struct _Maat_feather_t* feather)
feather->fs_status_id[STATUS_SCAN_ERR_CNT]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"scan_error"); feather->fs_status_id[STATUS_SCAN_ERR_CNT]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"scan_error");
feather->fs_status_id[STATUS_CMD_NUM]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_commit"); feather->fs_status_id[STATUS_CMD_NUM]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_commit");
feather->fs_status_id[STATUS_CMD_Q_SIZE]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_in_q"); feather->fs_status_id[STATUS_CMD_Q_SIZE]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT,"cmd_in_q");
feather->fs_status_id[STATUS_CMD_LINE_NUM]=FS_register(feather->stat_handle, FS_STYLE_STATUS, FS_CALC_SPEED,"line_cmd/s");
feather->fs_column_id[COLUMN_TABLE_RULE_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"rule"); feather->fs_column_id[COLUMN_TABLE_RULE_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"rule");
feather->fs_column_id[COLUMN_TABLE_REGEX_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"regex"); feather->fs_column_id[COLUMN_TABLE_REGEX_NUM]=FS_register(feather->stat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT,"regex");
@@ -184,6 +186,7 @@ void maat_stat_output(struct _Maat_feather_t* feather)
FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_LAST_REGION_SAVING], 0,FS_OP_SET,last_region_saving); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_LAST_REGION_SAVING], 0,FS_OP_SET,last_region_saving);
FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_NUM], 0,FS_OP_SET,feather->cmd_acc_num); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_NUM], 0,FS_OP_SET,feather->cmd_acc_num);
FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_Q_SIZE], 0,FS_OP_SET,feather->cmd_q_cnt); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_Q_SIZE], 0,FS_OP_SET,feather->cmd_q_cnt);
FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_CMD_LINE_NUM], 0,FS_OP_SET,feather->line_cmd_acc_num);
value=MESA_lqueue_get_count(feather->garbage_q); value=MESA_lqueue_get_count(feather->garbage_q);
FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_GARBAGE_QSIZE], 0,FS_OP_SET,value); FS_operate(feather->stat_handle, feather->fs_status_id[STATUS_GARBAGE_QSIZE], 0,FS_OP_SET,value);