redis内部key改为unsigned long,适配性能测试用例。

This commit is contained in:
zhengchao
2020-06-18 21:59:44 +08:00
parent af27d7197c
commit 73d27d983c
7 changed files with 226 additions and 153 deletions

View File

@@ -97,7 +97,7 @@ int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger)
return 0;
}
}
redisContext* get_redis_ctx_for_write(_Maat_feather_t * feather)
redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather)
{
int ret=0;
if(feather->mr_ctx.write_ctx==NULL)
@@ -361,7 +361,7 @@ void empty_serial_rules(struct serial_rule_t* rule)
memset(rule,0,sizeof(struct serial_rule_t));
return;
}
void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout)
void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout)
{
memset(rule, 0, sizeof(struct serial_rule_t));
rule->op=op;
@@ -430,7 +430,7 @@ int get_inc_key_list(long long instance_version, long long target_version, redis
for(i=0, j=0;i<reply->elements;i++)
{
assert(reply->element[i]->type==REDIS_REPLY_STRING);
ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%ld",op_str,s_rule[j].table_name,&(s_rule[j].rule_id));
ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id));
if(ret!=3||s_rule[i].rule_id<0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
@@ -989,7 +989,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
switch(s_rule[i].op)
{
case MAAT_OP_ADD:
redisAppendCommand(ctx,"SET %s:%s,%d %s",
redisAppendCommand(ctx,"SET %s:%s,%lu %s",
mr_key_prefix[MAAT_OP_ADD],
s_rule[i].table_name,
s_rule[i].rule_id,
@@ -998,7 +998,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
(*cnt)++;
append_cmd_cnt++;
//Allowing add duplicated members for rule id recycling.
redisAppendCommand(ctx,"RPUSH %s ADD,%s,%d",
redisAppendCommand(ctx,"RPUSH %s ADD,%s,%lu",
transaction_list,
s_rule[i].table_name,
s_rule[i].rule_id);
@@ -1007,7 +1007,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
append_cmd_cnt++;
if(s_rule[i].timeout>0)
{
redisAppendCommand(ctx,"ZADD %s %lld %s,%d",
redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
mr_expire_sset,
s_rule[i].timeout,
s_rule[i].table_name,
@@ -1019,7 +1019,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
}
if(s_rule[i].label_id>0)
{
redisAppendCommand(ctx,"ZADD %s %d %s,%d",
redisAppendCommand(ctx,"ZADD %s %d %s,%lu",
mr_label_sset,
s_rule[i].label_id,
s_rule[i].table_name,
@@ -1033,7 +1033,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
}
break;
case MAAT_OP_DEL:
redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d",
redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu",
mr_key_prefix[MAAT_OP_ADD],
s_rule[i].table_name,
s_rule[i].rule_id,
@@ -1045,7 +1045,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
(*cnt)++;
append_cmd_cnt++;
redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",
redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d",
mr_key_prefix[MAAT_OP_DEL],
s_rule[i].table_name,
s_rule[i].rule_id,
@@ -1055,7 +1055,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
append_cmd_cnt++;
//NX: Don't update already exisiting elements. Always add new elements.
redisAppendCommand(ctx,"RPUSH %s DEL,%s,%d",
redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu",
transaction_list,
s_rule[i].table_name,
s_rule[i].rule_id);
@@ -1064,7 +1064,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
append_cmd_cnt++;
// Try to remove from expiration sorted set, no matter wheather it exists or not.
redisAppendCommand(ctx,"ZREM %s %s,%d",
redisAppendCommand(ctx,"ZREM %s %s,%lu",
mr_expire_sset,
s_rule[i].table_name,
s_rule[i].rule_id);
@@ -1073,7 +1073,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
append_cmd_cnt++;
// Try to remove from label sorted set, no matter wheather it exists or not.
redisAppendCommand(ctx,"ZREM %s %s,%d",
redisAppendCommand(ctx,"ZREM %s %s,%lu",
mr_label_sset,
s_rule[i].table_name,
s_rule[i].rule_id);
@@ -1087,7 +1087,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s
continue;
}
//s_rule[i].timeout>0 was checked by caller.
redisAppendCommand(ctx,"ZADD %s %lld %s,%d",
redisAppendCommand(ctx,"ZADD %s %lld %s,%lu",
mr_expire_sset,
s_rule[i].timeout,
s_rule[i].table_name,
@@ -2051,13 +2051,13 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
}
freeReplyObject(data_reply);
return result;
}
}
int Maat_command_get_new_group_id(Maat_feather_t feather)
{
int group_id=0;
group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1);
return group_id;
}
}
int Maat_command_get_new_region_id(Maat_feather_t feather)
{
int region_id=0;
@@ -2212,6 +2212,7 @@ int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_O
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
serialize_region(region, group_id, line, sizeof(line));
@@ -2221,17 +2222,19 @@ int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_O
batch->batch_size++;
return 0;
}
#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id)
int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
serialize_group2group(op, g2g, line, sizeof(line));
set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
@@ -2242,10 +2245,8 @@ int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
serialize_group2compile(op, g2c, line, sizeof(line));
assert(g2c->group_id<1024*1024);
serialize_group2compile(op, g2c, line, sizeof(line));
set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
@@ -2264,7 +2265,8 @@ int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_O
{
absolute_expire_time=batch->server_time+expire_after;
}
set_serial_rule(s_rule, op, compile->config_id, label_id, table_name,
set_serial_rule(s_rule, op, compile->config_id, label_id, table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
@@ -2275,10 +2277,11 @@ int Maat_command_batch_commit(struct Maat_command_batch* batch)
struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size);
int i=0;
redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue);
while(tmp != NULL)
{
TAILQ_REMOVE(&batch->queue, tmp, entries);
TAILQ_REMOVE(&batch->queue, tmp, entries);
memcpy(s_rule_array+i, tmp, sizeof(*tmp));
free(tmp);
tmp = TAILQ_FIRST(&batch->queue);

View File

@@ -1200,7 +1200,7 @@ int write_iris(cJSON *json, struct iris_description_t *p_iris, void* logger)
"compile rule %d have no groups.",compile_id);
return -1;
}
i=1;
i=0;
cJSON_ArrayForEach(group_obj, group_array)
{
ret=write_group_rule(group_obj, compile_id, PARENT_TYPE_COMPILE, compile_id, i, p_iris, logger);

View File

@@ -348,7 +348,7 @@ struct foreign_key
struct serial_rule_t //rm= Redis Maat
{
enum MAAT_OPERATION op;//0: delete, 1: add.
long rule_id;
unsigned long rule_id;
int label_id;
char with_error;
long long timeout; // absolute unix time.
@@ -386,7 +386,7 @@ MAAT_RULE_EX_DATA rule_ex_data_new(const struct Maat_rule_head * rule_head, cons
void rule_ex_data_free(const struct Maat_rule_head * rule_head, const char* srv_def, MAAT_RULE_EX_DATA *ad, const struct compile_ex_data_idx* ex_desc);
void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout);
void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout);
void empty_serial_rules(struct serial_rule_t* rule);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,unsigned int serial_rule_num, long long server_time, void* logger);
long long redis_server_time(redisContext* ctx);