@@ -591,7 +591,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
|
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
|
||||||
"Inc Update form instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
|
"Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num);
|
||||||
*list=s_rule_array;
|
*list=s_rule_array;
|
||||||
*update_type=CM_UPDATE_TYPE_INC;
|
*update_type=CM_UPDATE_TYPE_INC;
|
||||||
*new_version=target_version;
|
*new_version=target_version;
|
||||||
@@ -1086,6 +1086,20 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
//parameters: 3 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER, 2 args: TRANSACTION_VERSION SERVER_TIME
|
||||||
|
const char* lua_exec_done=
|
||||||
|
"\
|
||||||
|
local transaction_version=tonumber(ARGV[1])\n\
|
||||||
|
local maat_version=redis.call(\'incrby\', KEYS[1], 1)\n\
|
||||||
|
if(maat_version~=transaction_version) then\
|
||||||
|
local affected=redis.call(\'zrangebyscore\', KEYS[2], transaction_version, transaction_version);\
|
||||||
|
for k,v in pairs(affected) do\
|
||||||
|
redis.call(\'zadd\', KEYS[2], maat_version, v);\
|
||||||
|
end;\
|
||||||
|
end;\
|
||||||
|
redis.call(\'zadd\', KEYS[3], maat_version, ARGV[2]);\
|
||||||
|
return maat_version;\
|
||||||
|
";
|
||||||
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
|
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
|
||||||
{
|
{
|
||||||
redisReply* data_reply=NULL;
|
redisReply* data_reply=NULL;
|
||||||
@@ -1097,11 +1111,12 @@ redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version
|
|||||||
}
|
}
|
||||||
if(maat_redis_version>0)
|
if(maat_redis_version>0)
|
||||||
{
|
{
|
||||||
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",mr_version_sset,server_time,maat_redis_version);
|
data_reply=_wrap_redisCommand(ctx, "eval %s 3 MAAT_VERSION %s %s %lld %lld",
|
||||||
freeReplyObject(data_reply);
|
lua_exec_done,
|
||||||
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
mr_status_sset,
|
||||||
(*cnt)++;
|
mr_version_sset,
|
||||||
data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
|
maat_redis_version,
|
||||||
|
server_time);
|
||||||
freeReplyObject(data_reply);
|
freeReplyObject(data_reply);
|
||||||
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
|
||||||
(*cnt)++;
|
(*cnt)++;
|
||||||
@@ -1255,7 +1270,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|||||||
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
|
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
|
||||||
|
|
||||||
struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
|
struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
|
||||||
long long new_version=0, transaction_finished_version=0;
|
long long transaction_version=0, transaction_finished_version=0;
|
||||||
int renew_num=0,ret=0;
|
int renew_num=0,ret=0;
|
||||||
for(i=0;i<serial_rule_num;i++)
|
for(i=0;i<serial_rule_num;i++)
|
||||||
{
|
{
|
||||||
@@ -1265,7 +1280,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ret=_exec_serial_rule_begin(ctx,serial_rule_num,renew_num, &renew_allowed, &new_version);
|
ret=_exec_serial_rule_begin(ctx, serial_rule_num, renew_num, &renew_allowed, &transaction_version);
|
||||||
if(ret!=0)//Preconditions for transaction are not satisfied.
|
if(ret!=0)//Preconditions for transaction are not satisfied.
|
||||||
{
|
{
|
||||||
success_cnt=-1;
|
success_cnt=-1;
|
||||||
@@ -1274,11 +1289,11 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|||||||
while(success_cnt<serial_rule_num)
|
while(success_cnt<serial_rule_num)
|
||||||
{
|
{
|
||||||
batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
|
batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
|
||||||
_exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
|
_exec_serial_rule(ctx, transaction_version, s_rule+success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
|
||||||
assert(multi_cmd_cnt<max_multi_cmd_num);
|
assert(multi_cmd_cnt<max_multi_cmd_num);
|
||||||
success_cnt+=batch_cnt;
|
success_cnt+=batch_cnt;
|
||||||
}
|
}
|
||||||
transaction_reply=_exec_serial_rule_end(ctx,new_version,server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
|
transaction_reply=_exec_serial_rule_end(ctx, transaction_version, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
|
||||||
if(1==mr_transaction_success(transaction_reply))
|
if(1==mr_transaction_success(transaction_reply))
|
||||||
{
|
{
|
||||||
assert(transaction_reply->elements==multi_cmd_cnt);
|
assert(transaction_reply->elements==multi_cmd_cnt);
|
||||||
@@ -1305,14 +1320,14 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
|
|||||||
{
|
{
|
||||||
success_cnt=-1;
|
success_cnt=-1;
|
||||||
}
|
}
|
||||||
if(new_version>0)
|
if(transaction_version>0)
|
||||||
{
|
{
|
||||||
transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
|
transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
|
||||||
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command,
|
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command,
|
||||||
"Redis transaction version: %lld, transaction finished version: %lld, status: %s",
|
"Redis transaction version: %lld, transaction finished version: %lld, status: %s",
|
||||||
new_version,
|
transaction_version,
|
||||||
transaction_finished_version,
|
transaction_finished_version,
|
||||||
transaction_finished_version==new_version?"OK":"Weird");
|
transaction_finished_version==transaction_version?"OK":"Weird");
|
||||||
}
|
}
|
||||||
|
|
||||||
freeReplyObject(transaction_reply);
|
freeReplyObject(transaction_reply);
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
#include "stream_fuzzy_hash.h"
|
#include "stream_fuzzy_hash.h"
|
||||||
#include "gram_index_engine.h"
|
#include "gram_index_engine.h"
|
||||||
|
|
||||||
int MAAT_FRAME_VERSION_2_6_20190220=1;
|
int MAAT_FRAME_VERSION_2_6_20190221=1;
|
||||||
|
|
||||||
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
|
||||||
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
|
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
|
||||||
|
|||||||
Reference in New Issue
Block a user