增加set_line、timeout、incrby、select的测试用例,自测通过。

This commit is contained in:
zhengchao
2017-07-07 11:04:11 +08:00
parent 3571096bb6
commit 757f8138ed
4 changed files with 246 additions and 145 deletions

View File

@@ -49,6 +49,20 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
va_end(ap);
return (redisReply *)reply;
}
int connect_redis_for_write(_Maat_feather_t * feather)
{
int ret=0;
assert(feather->redis_write_ctx==NULL);
feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout);
if(feather->redis_write_ctx==NULL)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Redis connect %s:%d for write failed."
,feather->redis_ip,feather->redis_port);
ret=-1;
}
return ret;
}
long long read_redis_integer(const redisReply* reply)
{
switch(reply->type)
@@ -74,8 +88,8 @@ long long redis_server_time(redisContext* ctx)
long long server_time=0;
redisReply* data_reply=NULL;
data_reply=_wrap_redisCommand(ctx,"TIME");
assert(data_reply->type==REDIS_REPLY_INTEGER);
server_time=data_reply->integer;
assert(data_reply->type==REDIS_REPLY_ARRAY);
server_time=atoll(data_reply->element[0]->str);
freeReplyObject(data_reply);
return server_time;
}
@@ -167,7 +181,7 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
}
int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long long new_version)
{
int append_cmd_cnt=0<EFBFBD><EFBFBD>
int append_cmd_cnt=0;
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d"
,rm_key_prefix[MAAT_OP_ADD]
,s_rule->table_name
@@ -543,11 +557,11 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
}
snprintf(line,sizeof(line),"%d\t%d\t1",p_group->group_id
,p_m_rule->config_id);
set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line);
set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line,timeout);
}
else
{
set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL);
set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL,0);
}
rule_num++;
if(p_group->regions==NULL)//group reuse.
@@ -566,12 +580,12 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st
}
serialize_region(p_region, p_group->group_id, line, sizeof(line));
set_serial_rule(list+rule_num,MAAT_OP_ADD
,p_region->region_id,0,p_region->table_name,line);
,p_region->region_id,0,p_region->table_name,line,0);
}
else
{
set_serial_rule(list+rule_num,MAAT_OP_DEL
,p_region->region_id,0,p_region->table_name,NULL);
,p_region->region_id,0,p_region->table_name,NULL,0);
}
rule_num++;
@@ -591,6 +605,74 @@ int mr_transaction_success(redisReply* data_reply)
return 1;
}
}
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num)
{
int append_cmd_cnt=0,i=0;
long long maat_redis_version=0;
redisReply* data_reply=NULL;
int redis_transaction_success=1;
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
maat_redis_version=read_redis_integer(data_reply);
maat_redis_version++;
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(data_reply);
append_cmd_cnt=0;
for(i=0;i<serial_rule_num;i++)
{
if(s_rule[i].op==MAAT_OP_ADD)
{
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name
,s_rule[i].rule_id
,s_rule[i].table_line);
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,maat_redis_version
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
if(s_rule[i].timeout>0)
{
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
,s_rule[i].timeout
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
}
if(s_rule[i].label_id>0)
{
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
,s_rule[i].label_id
,s_rule[i].rule_id);
append_cmd_cnt++;
}
}
else
{
append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version);
}
}
redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1");
append_cmd_cnt++;
redisAppendCommand(ctx,"EXEC");
append_cmd_cnt++;
redis_transaction_success=1;
for(i=0;i<append_cmd_cnt;i++)
{
_wrap_redisGetReply(ctx, &data_reply);
if(0==mr_transaction_success(data_reply))
{
redis_transaction_success=0;
}
freeReplyObject(data_reply);
}
return redis_transaction_success;
}
int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
{
int i=0,j=0,ret=0;
@@ -646,28 +728,29 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
void check_maat_expiration(redisContext *ctx, void *logger)
{
unsigned int i=0,s_rule_num=0;
int ret=0,append_cmd_cnt=0;
int ret=0;
int is_success=0;
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
long long server_time=0,maat_redis_version=0;
long long server_time=0;
data_reply=_wrap_redisCommand(ctx, "TIME");
server_time=data_reply->element[0].integer;
server_time=data_reply->element[0]->integer;
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time);
if(data_reply->type!=REDIS_REPLY_ARRAY)
if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0)
{
freeReplyObject(data_reply);
return;
}
s_rule_num=data_reply->elements;
s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t)*s_rule_num);
s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num);
for(i=0;i<s_rule_num;i++)
{
s_rule[i].op=MAAT_OP_DEL;
ret=sscanf(data_reply->element[i].str,"%[^,],%d",s_rule[i].table_name,&(s_rule.rule_id));
ret=sscanf(data_reply->element[i]->str,"%[^,],%d",s_rule[i].table_name,&(s_rule[i].rule_id));
assert(ret==2);
}
freeReplyObject(data_reply);
is_success=exec_serial_rule(ctx,s_rule, s_rule_num);
if(is_success==1)
@@ -743,74 +826,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
rule_list=NULL;
return;
}
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num)
{
int append_cmd_cnt=0,i=0;
long long maat_redis_version=0;
redisReply* data_reply=NULL;
int redis_transaction_success=1;
data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION");
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION");
maat_redis_version=read_redis_integer(data_reply);
maat_redis_version++;
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(data_reply);
append_cmd_cnt=0;
for(i=0;i<serial_rule_num;i++)
{
if(s_rule[i].op==MAAT_OP_ADD)
{
redisAppendCommand(ctx,"SET %s:%s,%d %s",rm_key_prefix[MAAT_OP_ADD]
,s_rule[i].table_name
,s_rule[i].rule_id
,s_rule[i].table_line);
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"ZADD %s NX %lld ADD,%s,%d",rm_status_sset
,maat_redis_version
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
if(s_rule[i].timeout>0)
{
redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset
,s_rule[i].timeout
,s_rule[i].table_name
,s_rule[i].rule_id);
append_cmd_cnt++;
}
if(s_rule[i].label_id>0)
{
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset
,s_rule[i].label_id
,s_rule[i].rule_id);
append_cmd_cnt++;
}
}
else
{
append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version);
}
}
redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1");
append_cmd_cnt++;
redisAppendCommand(ctx,"EXEC");
append_cmd_cnt++;
redis_transaction_success=1;
for(i=0;i<append_cmd_cnt;i++)
{
_wrap_redisGetReply(ctx, &data_reply);
if(0==mr_transaction_success(data_reply))
{
redis_transaction_success=0;
}
freeReplyObject(data_reply);
}
return redis_transaction_success;
}
void _maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src)
{
memcpy(dst,src,sizeof(struct Maat_region_t));
@@ -874,7 +890,7 @@ void _maat_empty_region(struct Maat_region_t* p)
return;
}
struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num, const char* label)
struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num)
{
int i=0;
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)calloc(sizeof(struct _Maat_cmd_inner_t),1);
@@ -909,40 +925,36 @@ int Maat_cmd_set_group(Maat_feather_t feather,int group_id, const struct Maat_re
assert(0);
return 0;
}
int Maat_cmd_set_line(Maat_feather_t feather, const char* table_name,int rule_id, const char* line, int timeout,enum MAAT_OPERATION op)
int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule, enum MAAT_OPERATION op)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
int ret=0, table_id=0,retry=0;
struct serial_rule_t s_rule;
long long absolute_expire_time=0;
if(_feather->AUTO_NUMBERING_ON==1)
{
return -1;
}
ret=map_str2int(_feather->map_tablename2id, table_name, &table_id);
ret=map_str2int(_feather->map_tablename2id, line_rule->table_name, &table_id);
if(ret<0)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Command set line id %d failed: unknown table %s."
,rule_id
,table_name);
, line_rule->rule_id
, line_rule->table_name);
return -1;
}
if(TABLE_TYPE_PLUGIN!=feather->p_table_info[table_id]->table_type)
if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Command set line id %d failed: table %s is not a plugin table."
,rule_id
,table_name);
, line_rule->rule_id
, line_rule->table_name);
return -1;
}
if(timeout>0)
if( line_rule->expire_after>0)
{
absolute_expire_time=redis_server_time(_feather->redis_write_ctx);
absolute_expire_time+=timeout;
absolute_expire_time+=line_rule->expire_after;
}
set_serial_rule(&s_rule, op, rule_id,table_name,line, absolute_expire_time);
set_serial_rule(&s_rule, op,line_rule->rule_id,line_rule->label_id,line_rule->table_name,line_rule->table_line, absolute_expire_time);
ret=0;
while(!ret)
{
@@ -1019,6 +1031,7 @@ int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPER
int ret=0;
_cmd->op=op;
assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD);
assert(_cmd->next==NULL);
if(op==MAAT_OP_DEL)
{
ret=reconstruct_cmd(_feather, _cmd);
@@ -1051,8 +1064,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
int ret=0,i=0,retry=0,append_cmd_cnt=0;
long long maat_redis_version=0;
int ret=0,i=0,retry=0;
int new_region_num=0,new_group_num=0;
int serial_rule_num=0,serial_rule_idx=0;
int transection_success=1;
@@ -1072,13 +1084,9 @@ int Maat_cmd_commit(Maat_feather_t feather)
}
if(_feather->redis_write_ctx==NULL)
{
_feather->redis_write_ctx=redisConnectWithTimeout(_feather->redis_ip, _feather->redis_port,_feather->connect_timeout);
if(_feather->redis_write_ctx==NULL)
ret=connect_redis_for_write(_feather);
if(ret!=0)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Redis connect %s:%d failed when appending command."
,_feather->redis_ip,_feather->redis_port);
ret=-1;
goto error_out;
}
}
@@ -1090,7 +1098,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
}
_feather->server_time=redis_server_time(ctx);
if(feather->AUTO_NUMBERING_ON==1)
if(_feather->AUTO_NUMBERING_ON==1)
{
data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num);
assert(data_reply->type==REDIS_REPLY_INTEGER);
@@ -1144,24 +1152,42 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
redisReply* data_reply=NULL;
long long result=0;
int ret=0;
if(_feather->redis_write_ctx==NULL)
{
ret=connect_redis_for_write(_feather);
if(ret!=0)
{
return -1;
}
}
data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment);
assert(data_reply->type==REDIS_REPLY_INTEGER);
result=data_reply->integer;
freeReplyObject(data_reply);
return result;
}
int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, int size)
int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
redisReply* data_reply=NULL;
unsigned int i=0;
int ret=0;
if(_feather->redis_write_ctx==NULL)
{
ret=connect_redis_for_write(_feather);
if(ret!=0)
{
return -1;
}
}
data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d"
,rm_label_sset
,label_id
,label_id);
for(i=0;i<data_reply->element&&i<size;i++)
for(i=0;i<data_reply->elements&&i<size;i++)
{
output_ids[i]=atoi(data_reply->element[i]->integer);
output_ids[i]=atoi(data_reply->element[i]->str);
}
freeReplyObject(data_reply);
return i;