1.【Bug修复】修复MAAT_OPT_CUMULATIVE_UPDATE_OFF启用时,导致增量版本号被跳过的bug。

2.maat_redis_tool增加-n参数,可以指定使用的db;
3.【重要更新】maat command批量写入机制调整,去除单次最多1024条配置的限制。
This commit is contained in:
zhengchao
2018-03-22 21:23:33 +08:00
parent 2bd813ffbc
commit c694922aa6
3 changed files with 121 additions and 69 deletions

View File

@@ -406,13 +406,15 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
} }
if(version_in_redis>version&&cumulative_off==1) if(version_in_redis>version&&cumulative_off==1)
{ {
target_version=version+1; target_version=version;
} }
else else
{ {
target_version=version_in_redis; target_version=version_in_redis-1;
} }
do{ do{
target_version++;
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version). //The elements are considered to be ordered from low to high scores(version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,target_version); reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,target_version);
@@ -431,19 +433,19 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
//a duplicate rule_id would induce this error. //a duplicate rule_id would induce this error.
freeReplyObject(reply); freeReplyObject(reply);
} }
target_version++;
}while(rule_num==0&&target_version<=version_in_redis&&cumulative_off==1); }while(rule_num==0&&target_version<=version_in_redis&&cumulative_off==1);
if(rule_num==0) if(rule_num==0)
{ {
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative=%d" MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
,rm_status_sset,version,target_version-1,!cumulative_off); ,rm_status_sset,version,target_version-1,cumulative_off==1?"OFF":"ON");
goto FULL_UPDATE; goto FULL_UPDATE;
} }
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
if(tmp_reply->type!=REDIS_REPLY_STRING) if(tmp_reply->type!=REDIS_REPLY_STRING)
{ {
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, version_in_redis); "ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, target_version);
free(tmp_reply); free(tmp_reply);
free(reply); free(reply);
goto FULL_UPDATE; goto FULL_UPDATE;
@@ -460,7 +462,7 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
goto FULL_UPDATE; goto FULL_UPDATE;
} }
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form version %lld to %lld (%lld entries).",version,version_in_redis,reply->elements); "Inc Update form version %lld to %lld (%lld entries).",version,target_version,reply->elements);
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++) for(i=0;i<reply->elements;i++)
@@ -484,7 +486,7 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
*list=s_rule; *list=s_rule;
*update_type=CM_UPDATE_TYPE_INC; *update_type=CM_UPDATE_TYPE_INC;
freeReplyObject(reply); freeReplyObject(reply);
*new_version=version_in_redis; *new_version=target_version;
return i; return i;
FULL_UPDATE: FULL_UPDATE:
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
@@ -826,13 +828,10 @@ int mr_operation_success(redisReply* data_reply)
return 1; return 1;
} }
#define REDIS_OP_PER_SRULE 8 long long _exec_serial_rule_begin(redisContext* ctx)
int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
{ {
int i=0,j=0,ret=0;
long long maat_redis_version=0; long long maat_redis_version=0;
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
int redis_transaction_success=1;
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
@@ -840,12 +839,25 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
maat_redis_version++; maat_redis_version++;
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"MULTI"); data_reply=_wrap_redisCommand(ctx,"MULTI");
return maat_redis_version;
}
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time)
{
redisReply* data_reply=NULL;
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
freeReplyObject(data_reply); freeReplyObject(data_reply);
int max_append_cnt=serial_rule_num*REDIS_OP_PER_SRULE+4; data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"EXEC");
return data_reply;
}
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, int rule_num, int* multi_cmd_seq, unsigned int *cnt, int offset)
{
int i=0,j=0,ret=0;
redisReply* data_reply=NULL;
int append_cmd_cnt=0; int append_cmd_cnt=0;
int *pipeline_seq=(int*)calloc(sizeof(int),max_append_cnt); for(i=0;i<rule_num;i++)
assert(server_time>0);
for(i=0;i<serial_rule_num;i++)
{ {
if(s_rule[i].op==MAAT_OP_ADD) if(s_rule[i].op==MAAT_OP_ADD)
{ {
@@ -853,14 +865,14 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id ,s_rule[i].rule_id
,s_rule[i].table_line); ,s_rule[i].table_line);
pipeline_seq[append_cmd_cnt]=i; multi_cmd_seq[(*cnt)++]=i+offset;
append_cmd_cnt++; append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements. //NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,maat_redis_version ,version
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id); ,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i; multi_cmd_seq[(*cnt)++]=i+offset;
append_cmd_cnt++; append_cmd_cnt++;
if(s_rule[i].timeout>0) if(s_rule[i].timeout>0)
{ {
@@ -868,7 +880,7 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
,s_rule[i].timeout ,s_rule[i].timeout
,s_rule[i].table_name ,s_rule[i].table_name
,s_rule[i].rule_id); ,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i; multi_cmd_seq[(*cnt)++]=i+offset;
append_cmd_cnt++; append_cmd_cnt++;
} }
if(s_rule[i].label_id>0) if(s_rule[i].label_id>0)
@@ -876,65 +888,74 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
,s_rule[i].label_id ,s_rule[i].label_id
,s_rule[i].rule_id); ,s_rule[i].rule_id);
pipeline_seq[append_cmd_cnt]=i; multi_cmd_seq[(*cnt)++]=i+offset;
append_cmd_cnt++; append_cmd_cnt++;
} }
} }
else else
{ {
ret=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); ret=del_rule_from_redis(ctx,s_rule+i,version);
for(j=0;j<ret;j++) for(j=0;j<ret;j++)
{ {
pipeline_seq[append_cmd_cnt+j]=i; multi_cmd_seq[(*cnt)++]=i+offset;
} }
append_cmd_cnt+=ret; append_cmd_cnt+=ret;
} }
} }
redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1");
append_cmd_cnt++;
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
append_cmd_cnt++;
redisAppendCommand(ctx,"EXEC");
append_cmd_cnt++;
redis_transaction_success=1;
for(i=0;i<append_cmd_cnt;i++) for(i=0;i<append_cmd_cnt;i++)
{ {
_wrap_redisGetReply(ctx, &data_reply); _wrap_redisGetReply(ctx, &data_reply);
if(0==mr_transaction_success(data_reply))
{
redis_transaction_success=0;
}
if(0==mr_operation_success(data_reply))
{
j=pipeline_seq[i];
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"exec rule %s, %d failed, content %s, rule id maybe conflicts.", s_rule[j].rule_id, s_rule[j].table_name, s_rule[j].table_line);
}
freeReplyObject(data_reply); freeReplyObject(data_reply);
} }
free(pipeline_seq); return;
return redis_transaction_success;
} }
#define MAX_REDIS_OP_PER_SRULE 8
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger) int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
{ {
int max_redis_batch=1*1024,batch_cnt=0; int max_redis_batch=1*1024,batch_cnt=0;
int success_cnt=0,ret=0, failed_batch=0; int success_cnt=0,j=0;
while(success_cnt<serial_rule_num&&failed_batch<5) redisReply*transaction_reply=NULL,*p=NULL;
unsigned int i=0;
unsigned int multi_cmd_cnt=0;
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
int *multi_cmd_seq=(int*)calloc(sizeof(int), max_multi_cmd_num);
long long new_version=0;
new_version=_exec_serial_rule_begin(ctx);
while(success_cnt<serial_rule_num)
{ {
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch); batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time,logger); _exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,multi_cmd_seq, &multi_cmd_cnt,success_cnt);
if(ret==1) assert(multi_cmd_cnt<max_multi_cmd_num);
{
success_cnt+=batch_cnt; success_cnt+=batch_cnt;
failed_batch=0; }
transaction_reply=_exec_serial_rule_end(ctx,new_version,server_time);
if(1==mr_transaction_success(transaction_reply))
{
assert(transaction_reply->elements==multi_cmd_cnt+2);
for(i=0;i<transaction_reply->elements;i++)
{
p=transaction_reply->element[i];
if(0==mr_operation_success(p))
{
j=multi_cmd_seq[i];
assert(j<serial_rule_num);
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"exec rule %s, %d failed, rule id maybe conflicts.", s_rule[j].table_name,s_rule[j].rule_id);
success_cnt--;
}
}
} }
else else
{ {
failed_batch++; success_cnt=-1;
}
} }
freeReplyObject(transaction_reply);
free(multi_cmd_seq);
return success_cnt; return success_cnt;
} }
@@ -1325,12 +1346,20 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
} }
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time);
} }
ret=0; ret=0;
while(success_cnt<line_num&&retry<10)
{
success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger);
if(success_cnt<0)//transaction failed if(success_cnt<0)//transaction failed
{
{ {
retry++; retry++;
} }
else
{
break;
}
}
if(retry>5) if(retry>5)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_command
@@ -1507,13 +1536,17 @@ int Maat_cmd_commit(Maat_feather_t feather)
p=p->next; p=p->next;
} }
assert(serial_rule_idx==serial_rule_num); assert(serial_rule_idx==serial_rule_num);
transection_success=0; transection_success=0;
while(transection_success<serial_rule_num&&retry<10) while(transection_success<serial_rule_num&&retry<10)
{ {
transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time,_feather->logger); transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
if(transection_success==-1) if(transection_success==-1)
{ {
retry++; retry++;
}
else
{
break;
} }
} }
if(retry>5) if(retry>5)
@@ -1594,9 +1627,16 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
if(data_reply->type==REDIS_REPLY_NIL)
{
maat_redis_version=0;
}
else
{ {
maat_redis_version=read_redis_integer(data_reply); maat_redis_version=read_redis_integer(data_reply);
maat_redis_version++; maat_redis_version++;
freeReplyObject(data_reply);
} }
data_reply=_wrap_redisCommand(ctx, "DBSIZE"); data_reply=_wrap_redisCommand(ctx, "DBSIZE");
dbsize=read_redis_integer(data_reply); dbsize=read_redis_integer(data_reply);
@@ -1626,7 +1666,7 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
if(redis_transaction_success==1) if(redis_transaction_success==1)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command
,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu." ,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu."
,db_index, (maat_redis_version==0)?0:(maat_redis_version-1),dbsize ,db_index, (maat_redis_version==0)?0:(maat_redis_version-1),dbsize
); );
} }

View File

@@ -30,7 +30,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_1_20180316=1; int MAAT_FRAME_VERSION_2_1_20180322=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};

View File

@@ -19,6 +19,7 @@ void maat_tool_print_usage(void)
printf("Usage:\n"); printf("Usage:\n");
printf("\t-h [host], redis IP, 127.0.0.1 as default.\n"); printf("\t-h [host], redis IP, 127.0.0.1 as default.\n");
printf("\t-p [port], redis port, 6379 as default.\n"); printf("\t-p [port], redis port, 6379 as default.\n");
printf("\t-n [db], redis db, 0 as default.\n");
printf("\t-d [dir], dump rules from redis to [dir], %s as default.\n",redis_dump_dir); printf("\t-d [dir], dump rules from redis to [dir], %s as default.\n",redis_dump_dir);
printf("\t-j [payload.json], add or delete rules as maat json. Must have field compile_table field, and plugin table's valid flag must be in the last column.\n"); printf("\t-j [payload.json], add or delete rules as maat json. Must have field compile_table field, and plugin table's valid flag must be in the last column.\n");
printf("\t-t [timeout], timeout config after t seconds, default 0, not timeout.\n"); printf("\t-t [timeout], timeout config after t seconds, default 0, not timeout.\n");
@@ -41,6 +42,7 @@ static redisContext * connect_redis(const char*redis_ip, int redis_port, int red
struct timeval connect_timeout; struct timeval connect_timeout;
connect_timeout.tv_sec=0; connect_timeout.tv_sec=0;
connect_timeout.tv_usec=100*1000; // 100 ms connect_timeout.tv_usec=100*1000; // 100 ms
redisReply* reply=NULL;
redisContext * ctx; redisContext * ctx;
ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout);
@@ -49,6 +51,9 @@ static redisContext * connect_redis(const char*redis_ip, int redis_port, int red
printf("Unable to connect %s:%d db%d\n",redis_ip,redis_port,redis_db); printf("Unable to connect %s:%d db%d\n",redis_ip,redis_port,redis_db);
return NULL; return NULL;
} }
reply=_wrap_redisCommand(ctx, "select %d",redis_db);
freeReplyObject(reply);
return ctx; return ctx;
} }
@@ -223,7 +228,7 @@ int main(int argc, char * argv[])
unsigned long json_file_size=0,read_size=0; unsigned long json_file_size=0,read_size=0;
char* json_buff=NULL; char* json_buff=NULL;
while((oc=getopt(argc,argv,"h:p:d:f:j:t:"))!=-1) while((oc=getopt(argc,argv,"h:p:n:d:f:j:t:"))!=-1)
{ {
switch(oc) switch(oc)
{ {
@@ -233,6 +238,9 @@ int main(int argc, char * argv[])
case 'p': case 'p':
sscanf(optarg,"%d",&redis_port); sscanf(optarg,"%d",&redis_port);
break; break;
case 'n':
sscanf(optarg,"%d",&redis_db);
break;
case 'd': case 'd':
model=WORK_MODE_DUMP; model=WORK_MODE_DUMP;
strncpy(dump_dir,optarg,sizeof(dump_dir)); strncpy(dump_dir,optarg,sizeof(dump_dir));
@@ -321,9 +329,13 @@ int main(int argc, char * argv[])
config_monitor_traverse(0, tmp_iris_path, NULL, make_serial_rule, NULL, s_rule,NULL, NULL); config_monitor_traverse(0, tmp_iris_path, NULL, make_serial_rule, NULL, s_rule,NULL, NULL);
printf("Timeout=%lld\n",absolute_expire_time); printf("Timeout=%lld\n",absolute_expire_time);
ret=0; ret=0;
while(success_cnt<total_line_cnt) do
{ {
success_cnt+=exec_serial_rule(ctx,s_rule+success_cnt, total_line_cnt-success_cnt, server_time, NULL); success_cnt=exec_serial_rule(ctx,s_rule, total_line_cnt, server_time, NULL);
}while(success_cnt<0);
if(success_cnt!=total_line_cnt)
{
printf("Only Add %d of %d, rule id maybe conflicts.\n",success_cnt,total_line_cnt);
} }
for(i=0;i<total_line_cnt;i++) for(i=0;i<total_line_cnt;i++)
{ {