1、提供Maat_cmd_flushDB函数用于重置redis数据库,重置操作后版本号加1。
2、当ZRANGEBYSCORE结果为空的时(日志Got nothing after ),会全量加载。 3、exec_serial_rule事务失败时,重试5次。
This commit is contained in:
@@ -151,5 +151,6 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
|
|||||||
//If the key does not exist, it is set to 0 before performing the operation.
|
//If the key does not exist, it is set to 0 before performing the operation.
|
||||||
long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment);
|
long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment);
|
||||||
int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size);
|
int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size);
|
||||||
|
int Maat_cmd_flushDB(Maat_feather_t feather);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
@@ -437,7 +437,7 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
|
|||||||
{
|
{
|
||||||
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative=%d"
|
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative=%d"
|
||||||
,rm_status_sset,version,target_version-1,!cumulative_off);
|
,rm_status_sset,version,target_version-1,!cumulative_off);
|
||||||
return -1;
|
goto FULL_UPDATE;
|
||||||
}
|
}
|
||||||
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
|
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
|
||||||
if(tmp_reply->type!=REDIS_REPLY_STRING)
|
if(tmp_reply->type!=REDIS_REPLY_STRING)
|
||||||
@@ -920,18 +920,19 @@ int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_
|
|||||||
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
|
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
|
||||||
{
|
{
|
||||||
int max_redis_batch=1*1024,batch_cnt=0;
|
int max_redis_batch=1*1024,batch_cnt=0;
|
||||||
int success_cnt=0,ret=0;
|
int success_cnt=0,ret=0, failed_batch=0;
|
||||||
while(success_cnt<serial_rule_num)
|
while(success_cnt<serial_rule_num&&failed_batch<5)
|
||||||
{
|
{
|
||||||
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
|
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
|
||||||
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time,logger);
|
ret=_exec_serial_rule(ctx,s_rule+success_cnt,batch_cnt, server_time,logger);
|
||||||
if(ret==1)
|
if(ret==1)
|
||||||
{
|
{
|
||||||
success_cnt+=batch_cnt;
|
success_cnt+=batch_cnt;
|
||||||
|
failed_batch=0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
break;
|
failed_batch++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return success_cnt;
|
return success_cnt;
|
||||||
@@ -1583,4 +1584,68 @@ int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsi
|
|||||||
}
|
}
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
return i;
|
return i;
|
||||||
|
}
|
||||||
|
int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
|
||||||
|
{
|
||||||
|
redisReply* data_reply=NULL;
|
||||||
|
long long maat_redis_version=0, dbsize=0;
|
||||||
|
int append_cmd_cnt=0, i=0;
|
||||||
|
int redis_transaction_success=1;
|
||||||
|
|
||||||
|
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
|
||||||
|
maat_redis_version=read_redis_integer(data_reply);
|
||||||
|
maat_redis_version++;
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
data_reply=_wrap_redisCommand(ctx, "DBSIZE");
|
||||||
|
dbsize=read_redis_integer(data_reply);
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
|
||||||
|
data_reply=_wrap_redisCommand(ctx,"MULTI");
|
||||||
|
redisAppendCommand(ctx,"FLUSHDB");
|
||||||
|
append_cmd_cnt++;
|
||||||
|
redisAppendCommand(ctx,"SET MAAT_VERSION %lld",maat_redis_version);
|
||||||
|
append_cmd_cnt++;
|
||||||
|
redisAppendCommand(ctx,"SET SEQUENCE_REGION 1",maat_redis_version);
|
||||||
|
append_cmd_cnt++;
|
||||||
|
redisAppendCommand(ctx,"SET SEQUENCE_GROUP 1",maat_redis_version);
|
||||||
|
append_cmd_cnt++;
|
||||||
|
redisAppendCommand(ctx,"EXEC");
|
||||||
|
|
||||||
|
for(i=0;i<append_cmd_cnt;i++)
|
||||||
|
{
|
||||||
|
_wrap_redisGetReply(ctx, &data_reply);
|
||||||
|
if(0==mr_transaction_success(data_reply))
|
||||||
|
{
|
||||||
|
redis_transaction_success=0;
|
||||||
|
}
|
||||||
|
freeReplyObject(data_reply);
|
||||||
|
}
|
||||||
|
if(redis_transaction_success==1)
|
||||||
|
{
|
||||||
|
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_command
|
||||||
|
,"FlushDB %d, MAAT_VERSION=%llu, DBSize=%llu."
|
||||||
|
,db_index, maat_redis_version-1,dbsize
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return redis_transaction_success;
|
||||||
|
}
|
||||||
|
int Maat_cmd_flushDB(Maat_feather_t feather)
|
||||||
|
{
|
||||||
|
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
|
||||||
|
int ret=0;
|
||||||
|
if(_feather->redis_write_ctx==NULL)
|
||||||
|
{
|
||||||
|
ret=connect_redis_for_write(_feather);
|
||||||
|
if(ret!=0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
do
|
||||||
|
{
|
||||||
|
ret=redis_flush_DB(_feather->redis_write_ctx, _feather->redis_index, _feather->logger);
|
||||||
|
}while(ret==0);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@
|
|||||||
#include "stream_fuzzy_hash.h"
|
#include "stream_fuzzy_hash.h"
|
||||||
#include "gram_index_engine.h"
|
#include "gram_index_engine.h"
|
||||||
|
|
||||||
int MAAT_FRAME_VERSION_2_1_20180307=1;
|
int MAAT_FRAME_VERSION_2_1_20180315=1;
|
||||||
|
|
||||||
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
||||||
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
|
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
|
||||||
|
|||||||
@@ -1059,6 +1059,7 @@ int main(int argc,char* argv[])
|
|||||||
test_command(feather);
|
test_command(feather);
|
||||||
test_set_cmd_line(feather);
|
test_set_cmd_line(feather);
|
||||||
test_add_ip_command(feather,"IP_CONFIG");
|
test_add_ip_command(feather,"IP_CONFIG");
|
||||||
|
Maat_cmd_flushDB(feather);
|
||||||
}
|
}
|
||||||
test_sfh_digest(test_digest_file);
|
test_sfh_digest(test_digest_file);
|
||||||
sleep(wait_second);
|
sleep(wait_second);
|
||||||
|
|||||||
Reference in New Issue
Block a user