#5 允许配置ID重用;提供工具函数Maat_helper_read_column,用于处理行列。

This commit is contained in:
zhengchao
2018-12-21 20:16:22 +06:00
parent a53cbca268
commit bb7710eb37
5 changed files with 218 additions and 119 deletions

View File

@@ -258,6 +258,12 @@ int Maat_rule_get_ex_new_index(Maat_feather_t feather, const char* compile_table
//returned data is duplicated by dup_func of Maat_rule_get_ex_new_index, caller is responsible to free the data.
MAAT_RULE_EX_DATA Maat_rule_get_ex_data(Maat_feather_t feather, const struct Maat_rule_t* rule, int idx);
//Helper function for parsing space or tab seperated line.
//column_seq: column sequence is numberd from 1.
//Return 0 if success.
int Maat_helper_read_column(const char* line, int column_seq, size_t *column_offset, size_t *column_len);
//Following functions are similar to Maat_rule_get_ex_data, except they are effective on plugin table.
typedef void* MAAT_PLUGIN_EX_DATA;
typedef void Maat_plugin_EX_new_func_t(int table_id, const char* key, const char* table_line, MAAT_PLUGIN_EX_DATA* ad, long argl, void *argp);

View File

@@ -2281,4 +2281,8 @@ int Maat_read_state(Maat_feather_t feather,enum MAAT_STATE_OPT type, void* valu
}
return 0;
}
int Maat_helper_read_column(const char* line, int column_seq, size_t *column_offset, size_t *column_len)
{
return get_column_pos(line, column_seq, column_offset, column_len);
}

View File

@@ -14,15 +14,15 @@
#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR"))
#define maat_command (module_name_str("MAAT_COMMAND"))
const time_t MAAT_REDIS_RECONNECT_INTERVAL=5;
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* rm_status_sset="MAAT_UPDATE_STATUS";
const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
const char* rm_label_sset="MAAT_LABEL_INDEX";
const char* rm_version_sset="MAAT_VERSION_TIMER";
const char* rm_expire_lock="EXPIRE_OP_LOCK";
const long rm_expire_lock_timeout=300*1000;
const char* mr_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* mr_status_sset="MAAT_UPDATE_STATUS";
const char* mr_expire_sset="MAAT_EXPIRE_TIMER";
const char* mr_label_sset="MAAT_LABEL_INDEX";
const char* mr_version_sset="MAAT_VERSION_TIMER";
const char* mr_expire_lock="EXPIRE_OP_LOCK";
const long mr_expire_lock_timeout=300*1000;
const static int MAAT_REDIS_SYNC_TIME=30*60;
const char* rm_op_str[]={"DEL","ADD","RENEW_TIMEOUT"};
const char* mr_op_str[]={"DEL","ADD","RENEW_TIMEOUT"};
const char* foreign_source_prefix="redis://";
const char* foreign_key_prefix="__FILE_";
@@ -356,13 +356,13 @@ int get_inc_key_list(long long instance_version, long long target_version, redis
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,instance_version,target_version);
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",mr_status_sset,instance_version,target_version);
if(reply==NULL)
{
__redis_strerror_r(errno,err_buff,sizeof(err_buff)-1);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET %s failed %s.",rm_status_sset,err_buff);
"GET %s failed %s.",mr_status_sset,err_buff);
return -1;
}
assert(reply->type==REDIS_REPLY_ARRAY);
@@ -373,11 +373,11 @@ int get_inc_key_list(long long instance_version, long long target_version, redis
return 0;
}
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",mr_status_sset,reply->element[0]->str);
if(tmp_reply->type!=REDIS_REPLY_STRING)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,instance_version, target_version);
"ZSCORE %s %s failed Version: %lld->%lld",mr_status_sset,reply->element[0]->str,instance_version, target_version);
freeReplyObject(tmp_reply);
freeReplyObject(reply);
return -1;
@@ -589,7 +589,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
if(rule_num==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
,rm_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON");
,mr_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON");
return 0;
}
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
@@ -707,7 +707,7 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru
redisReply* reply=NULL;
for(i=0;i<rule_num;i++)
{
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",rm_key_prefix[rule_list[i].op]
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[rule_list[i].op]
,rule_list[i].table_name
,rule_list[i].rule_id);
ret=redisAppendCommand(c, redis_cmd);
@@ -730,7 +730,7 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru
else
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Redis GET %s:%s,%d failed",rm_key_prefix[rule_list[i].op]
,"Redis GET %s:%s,%d failed",mr_key_prefix[rule_list[i].op]
,rule_list[i].table_name
,rule_list[i].rule_id);
error_happened=1;
@@ -747,7 +747,7 @@ int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int ru
for(i=0;i<failed_cnt;i++)
{
idx=retry_ids[i];
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",rm_key_prefix[MAAT_OP_DEL]
snprintf(redis_cmd,sizeof(redis_cmd),"GET %s:%s,%ld",mr_key_prefix[MAAT_OP_DEL]
,rule_list[idx].table_name
,rule_list[idx].rule_id);
ret=redisAppendCommand(c, redis_cmd);
@@ -992,19 +992,7 @@ int mr_transaction_success(redisReply* data_reply)
return 1;
}
}
int mr_operation_success(redisReply* actual_reply, redisReply* expected_reply)
{
if(expected_reply->type!=actual_reply->type)
{
return 0;
}
if(expected_reply->type==REDIS_REPLY_INTEGER&&expected_reply->integer!=actual_reply->integer)
{
return 0;
}
return 1;
}
int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire)
{
redisReply* reply=NULL;
@@ -1028,11 +1016,47 @@ void redlock_unlock(redisContext * ctx, const char * lock_name)
freeReplyObject(reply);
}
struct expected_reply_t
#define POSSIBLE_REDIS_REPLY_SIZE 2
struct expected_reply
{
int srule_seq;
redisReply reply;
int possible_reply_num;
redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE];
};
void expected_reply_add(struct expected_reply* expected, int srule_seq, int type, long long integer)
{
int i=expected->possible_reply_num;
assert(i<POSSIBLE_REDIS_REPLY_SIZE);
expected->srule_seq=srule_seq;
expected->possible_replies[i].type=type;
expected->possible_replies[i].integer=integer;
expected->possible_reply_num++;
}
int mr_operation_success(redisReply* actual_reply, struct expected_reply* expected)
{
int i=0;
if(expected->possible_replies[0].type!=actual_reply->type)
{
return 0;
}
for(i=0; i< expected->possible_reply_num; i++)
{
if(expected->possible_replies[i].type==REDIS_REPLY_INTEGER &&
expected->possible_replies[i].type==actual_reply->type &&
expected->possible_replies[i].integer==actual_reply->integer)
{
return 1;
}
if(expected->possible_replies[i].type==REDIS_REPLY_STATUS &&
expected->possible_replies[i].type==actual_reply->type &&
0==strcasecmp(actual_reply->str, "OK"))
{
return 1;
}
}
return 0;
}
long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *maat_redis_version)
{
@@ -1040,7 +1064,7 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
redisReply* data_reply=NULL;
if(renew_rule_num>0)
{
while(0==redlock_try_lock(ctx, rm_expire_lock, rm_expire_lock_timeout))
while(0==redlock_try_lock(ctx, mr_expire_lock, mr_expire_lock_timeout))
{
usleep(1000);
}
@@ -1064,31 +1088,30 @@ long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule
}
return ret;
}
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply_t* expect_reply, unsigned int *cnt)
redisReply* _exec_serial_rule_end(redisContext* ctx,long long maat_redis_version, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt)
{
redisReply* data_reply=NULL;
if(renew_allowed==1)
{
redlock_unlock(ctx, rm_expire_lock);
redlock_unlock(ctx, mr_expire_lock);
expect_reply[*cnt].srule_seq=-1;
(*cnt)++;
}
if(maat_redis_version>0)
{
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
data_reply=_wrap_redisCommand(ctx,"ZADD %s NX %d %d",mr_version_sset,server_time,maat_redis_version);
freeReplyObject(data_reply);
expect_reply[*cnt].srule_seq=-1;
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
data_reply=_wrap_redisCommand(ctx,"INCRBY MAAT_VERSION 1");
freeReplyObject(data_reply);
expect_reply[*cnt].srule_seq=-1;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
}
data_reply=_wrap_redisCommand(ctx,"EXEC");
return data_reply;
}
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply_t* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed)
{
redisReply* data_reply=NULL;
unsigned int append_cmd_cnt=0, i=0;
@@ -1097,98 +1120,99 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
switch(s_rule[i].op)
{
case MAAT_OP_ADD:
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name
,s_rule[i].rule_id
,s_rule[i].table_line);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS;
redisAppendCommand(ctx,"SET %s:%s,%d %s",
mr_key_prefix[MAAT_OP_ADD],
s_rule[i].table_name,
s_rule[i].rule_id,
s_rule[i].table_line);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
(*cnt)++;
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,version
,s_rule[i].table_name
,s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=1;
//Allowing add duplicated members for rule id recycling.
redisAppendCommand(ctx,"ZADD %s %lld ADD,%s,%d",
mr_status_sset,
version,
s_rule[i].table_name,
s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
if(s_rule[i].timeout>0)
{
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
,s_rule[i].timeout
,s_rule[i].table_name
,s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=1;
redisAppendCommand(ctx,"ZADD %s %lld %s,%d",
mr_expire_sset,
s_rule[i].timeout,
s_rule[i].table_name,
s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
}
if(s_rule[i].label_id>0)
{
redisAppendCommand(ctx,"ZADD %s NX %d %s,%d",
rm_label_sset,
redisAppendCommand(ctx,"ZADD %s %d %s,%d",
mr_label_sset,
s_rule[i].label_id,
s_rule[i].table_name,
s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=1;
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
}
break;
case MAAT_OP_DEL:
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d"
,rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name
,s_rule[i].rule_id
,rm_key_prefix[MAAT_OP_DEL]
,s_rule[i].table_name
,s_rule[i].rule_id
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d",
mr_key_prefix[MAAT_OP_ADD],
s_rule[i].table_name,
s_rule[i].rule_id,
mr_key_prefix[MAAT_OP_DEL],
s_rule[i].table_name,
s_rule[i].rule_id
);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_STATUS;
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0);
(*cnt)++;
append_cmd_cnt++;
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL]
,s_rule[i].table_name
,s_rule[i].rule_id
,MAAT_REDIS_SYNC_TIME);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=1;
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",
mr_key_prefix[MAAT_OP_DEL],
s_rule[i].table_name,
s_rule[i].rule_id,
MAAT_REDIS_SYNC_TIME);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
(*cnt)++;
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset
,version
,s_rule[i].table_name
,s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=1;
redisAppendCommand(ctx,"ZADD %s %d DEL,%s,%d",
mr_status_sset,
version,
s_rule[i].table_name,
s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
// Try to remove from expiration sorted set, no matter wheather it exists or not.
redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset,
redisAppendCommand(ctx,"ZREM %s %s,%d",
mr_expire_sset,
s_rule[i].table_name,
s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=-1;
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
redisAppendCommand(ctx,"ZREM %s %s,%d",rm_label_sset,
// Try to remove from label sorted set, no matter wheather it exists or not.
redisAppendCommand(ctx,"ZREM %s %s,%d",
mr_label_sset,
s_rule[i].table_name,
s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=-1;
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
break;
@@ -1198,13 +1222,12 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
continue;
}
//s_rule[i].timeout>0 was checked by caller.
redisAppendCommand(ctx,"ZADD %s %lld %s,%d",rm_expire_sset
,s_rule[i].timeout
,s_rule[i].table_name
,s_rule[i].rule_id);
expect_reply[*cnt].srule_seq=i+offset;
expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER;
expect_reply[*cnt].reply.integer=0;
redisAppendCommand(ctx,"ZADD %s %lld %s,%d",
mr_expire_sset,
s_rule[i].timeout,
s_rule[i].table_name,
s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0);
(*cnt)++;
append_cmd_cnt++;
@@ -1233,7 +1256,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
const int MAX_REDIS_OP_PER_SRULE=8;
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
struct expected_reply_t *expected_reply=(struct expected_reply_t*)calloc(sizeof(struct expected_reply_t), max_multi_cmd_num);
struct expected_reply *expected_reply=(struct expected_reply*)calloc(sizeof(struct expected_reply), max_multi_cmd_num);
long long new_version=0;
int renew_num=0,ret=0;
for(i=0;i<serial_rule_num;i++)
@@ -1267,14 +1290,14 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned in
//failed is acceptable
//or transaciton is success
//or continuation of last failed
if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p,&(expected_reply[i].reply))||last_failed==expected_reply[i].srule_seq)
if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p, expected_reply+i)||last_failed==expected_reply[i].srule_seq)
{
continue;
}
rule_seq=expected_reply[i].srule_seq;
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"%s %s %d failed, rule id maybe conflict or not exist."
,rm_op_str[s_rule[rule_seq].op]
,mr_op_str[s_rule[rule_seq].op]
,s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id);
success_cnt--;
last_failed=rule_seq;
@@ -1294,7 +1317,7 @@ error_out:
if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
{
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"%s %s %d is not allowed due to lock contention.",rm_op_str[MAAT_OP_RENEW_TIMEOUT]
,"%s %s %d is not allowed due to lock contention.",mr_op_str[MAAT_OP_RENEW_TIMEOUT]
, s_rule[i].table_name,s_rule[i].rule_id);
}
}
@@ -1370,7 +1393,7 @@ void check_maat_expiration(redisContext *ctx, void *logger)
server_time=redis_server_time(ctx);
data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time);
data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_expire_sset,server_time);
if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0)
{
freeReplyObject(data_reply);
@@ -1411,9 +1434,9 @@ void cleanup_update_status(redisContext *ctx, void *logger)
reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(reply);
redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
//consume reply "OK" and "QUEUED".
for(i=0;i<append_cmd_cnt;i++)
@@ -1442,7 +1465,7 @@ void cleanup_update_status(redisContext *ctx, void *logger)
freeReplyObject(reply);
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",rm_status_sset,version_lower_bound,version_upper_bound);
reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",mr_status_sset,version_lower_bound,version_upper_bound);
entry_num=read_redis_integer(reply);
freeReplyObject(reply);
@@ -1749,11 +1772,11 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx* m
if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write
{
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
if(1==redlock_try_lock(mr_ctx->read_ctx, rm_expire_lock, rm_expire_lock_timeout))
if(1==redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout))
{
check_maat_expiration(mr_ctx->read_ctx, logger);
cleanup_update_status(mr_ctx->read_ctx, logger);
redlock_unlock(mr_ctx->read_ctx, rm_expire_lock);
redlock_unlock(mr_ctx->read_ctx, mr_expire_lock);
}
}
if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err)
@@ -2299,7 +2322,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
int ret=0,i=0;
int new_region_num=0,new_group_num=0;
int serial_rule_num=0,serial_rule_idx=0;
int serial_rule_num=0,serial_rule_idx=0;
UNUSED int transaction_success=1;
struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
@@ -2357,9 +2380,9 @@ int Maat_cmd_commit(Maat_feather_t feather)
serial_rule_idx+=build_serial_rule(_feather,p,s_rule+serial_rule_idx, serial_rule_num-serial_rule_idx);
p=p->next;
}
assert(serial_rule_idx==serial_rule_num);
transection_success=0;
transection_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
assert(serial_rule_idx==serial_rule_num);
transaction_success=0;
transaction_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
assert(transaction_success==serial_rule_num);
ret=_feather->cmd_q_cnt;
_feather->cmd_acc_num+=_feather->cmd_q_cnt;
@@ -2433,7 +2456,7 @@ int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_ke
return -1;
}
data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d",
data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d",
mr_label_sset,
label_id,
label_id);

View File

@@ -32,7 +32,7 @@
#include "stream_fuzzy_hash.h"
#include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_5_20181217=1;
int MAAT_FRAME_VERSION_2_5_20181221=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
@@ -1849,7 +1849,6 @@ int sync_region(MESA_htable_handle region_hash,int region_id,const char* table_n
"region id %d of table %s is not unique.",region_id,table_name);
return -1;
}
}
else
{
@@ -2366,7 +2365,7 @@ void del_group_rule(struct Maat_table_desc* table,struct db_group_rule_t* db_gro
,db_group_rule->compile_id);
return;
}
if(compile_rule->group_cnt==0&&compile_rule->db_c_rule==NULL)
if(compile_rule->group_cnt==0&&compile_rule->is_valid==0)
{
HASH_delete_by_id(scanner->compile_hash, db_group_rule->compile_id);
garbage_bagging(GARBAGE_COMPILE_RULE, compile_rule, scanner->tomb_ref);

View File

@@ -36,7 +36,7 @@ Maat_feather_t g_feather=NULL;
void *g_logger=NULL;
int g_iThreadNum=4;
const char* table_info_path="./table_info.conf";
int scan_interval_ms=1;
int scan_interval_ms=500;
int effective_interval_ms=0;
void wait_for_cmd_effective(Maat_feather_t feather, long long version_before)
{
@@ -265,7 +265,6 @@ TEST(IPScan, IPv4)
return;
}
TEST(IPScan, IPv6)
{
int table_id=0,ret=0;
struct Maat_rule_t result[4];
@@ -290,6 +289,23 @@ TEST(IPScan, IPv6)
Maat_clean_status(&mid);
return;
}
TEST(Helper, ReadColumn)
{
const char* ip="192.168.0.1";
const char* tmp="something";
char line[256];
snprintf(line, sizeof(line), "1\t%s\t%s",ip, tmp);
size_t offset=0, len=0;
int ret=0;
ret=Maat_helper_read_column(line, 2, &offset, &len);
EXPECT_EQ(ret, 0);
EXPECT_EQ(0, strncmp(ip, line+offset, len));
ret=Maat_helper_read_column(line, 3, &offset, &len);
EXPECT_EQ(ret, 0);
EXPECT_EQ(0, strncmp(tmp, line+offset, len));
}
TEST(IntervalScan, Pure)
{
@@ -1212,6 +1228,57 @@ TEST_F(MaatCmdTest, SetExpr)
&result,NULL, 1,
&mid, 0);
EXPECT_EQ(ret, 0);
Maat_clean_status(&mid);
}
TEST_F(MaatCmdTest, RuleIDRecycle)
{
const char* table_name="HTTP_URL";
const char* scan_data="Reuse rule ID is allowed.";
const char* keywords="Reuse&rule";
Maat_feather_t feather=MaatCmdTest::_shared_feather;
struct Maat_rule_t result;
scan_status_t mid=NULL;
int table_id=0;
table_id=Maat_table_register(feather,table_name);
ASSERT_GT(table_id, 0);
int rule_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", 1);
int label_id=5211, ret=0;
test_add_expr_command(feather,table_name,rule_id, 0, label_id, keywords);
ret=Maat_cmd_commit(feather);
EXPECT_TRUE(ret>=0);
usleep(WAIT_FOR_EFFECTIVE_US);//waiting for commands go into effect
ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data),
&result, NULL, 1,
&mid, 0);
Maat_clean_status(&mid);
EXPECT_EQ(ret, 1);
EXPECT_EQ(result.config_id, rule_id);
del_command(feather, rule_id);
usleep(WAIT_FOR_EFFECTIVE_US);//waiting for commands go into effect
ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data),
&result, NULL, 1,
&mid, 0);
Maat_clean_status(&mid);
EXPECT_EQ(ret, 0);
test_add_expr_command(feather,table_name,rule_id, 0, label_id, keywords);
ret=Maat_cmd_commit(feather);
EXPECT_TRUE(ret>=0);
usleep(WAIT_FOR_EFFECTIVE_US);//waiting for commands go into effect
memset(&result, 0, sizeof(result));
ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data),
&result, NULL, 1,
&mid, 0);
Maat_clean_status(&mid);
EXPECT_EQ(ret, 1);
EXPECT_EQ(result.config_id, rule_id);
return;
}