Maat command支持批量提交

This commit is contained in:
zhengchao
2020-06-17 19:59:55 +08:00
parent 2dbbd55c95
commit af27d7197c
5 changed files with 165 additions and 121 deletions

View File

@@ -27,15 +27,7 @@ const char* foreign_source_prefix="redis://";
const char* foreign_key_prefix="__FILE_";
struct _Maat_cmd_inner_t
{
struct Maat_cmd_t cmd;
enum MAAT_OPERATION op;
int ref_cnt;
int region_size[MAX_EXPR_ITEM_NUM];
char* huge_service_defined; //max to 4KB
struct _Maat_cmd_inner_t* next;
};
int _wrap_redisGetReply(redisContext *c, redisReply **reply)
{
return redisGetReply(c, (void **)reply);
@@ -1808,9 +1800,9 @@ void _maat_empty_region(struct Maat_region_t* p)
memset(p,0,sizeof(struct Maat_region_t));
return;
}
}
int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op)
{
{
size_t i=0;
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
int ret=0, table_id=0,success_cnt=0;
@@ -1825,8 +1817,8 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l
if(!server_time)
{
return -1;
}
s_rule=ALLOC(struct serial_rule_t, line_num);
}
s_rule=ALLOC(struct serial_rule_t, n_line);
for(i=0;i<n_line;i++)
{
table_id=Maat_table_get_id_by_name(_feather->table_mgr, line_rule[i]->table_name);
@@ -1849,8 +1841,8 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l
}
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name,
line_rule[i]->table_line, absolute_expire_time);
}
success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger);
}
success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger);
if(success_cnt<0||(size_t)success_cnt!=n_line)//error
{
ret=-1;
@@ -1859,7 +1851,7 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l
ret=success_cnt;
_feather->line_cmd_acc_num+=success_cnt;
error_out:
error_out:
for(i=0;i<n_line;i++)
{
empty_serial_rules(s_rule+i);
@@ -2129,18 +2121,6 @@ int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_ke
*keys=result;
return result_cnt;
}
int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size)
{
struct Maat_cmd_key* keys=NULL;
int result_cnt=0, i=0;
result_cnt=Maat_cmd_key_select(feather, label_id, &keys);
for(i=0; i<result_cnt && i<(int)size; i++)
{
output_ids[i]=keys[i].rule_id;
}
Maat_cmd_key_free(&keys, result_cnt);
return i;
int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
{
@@ -2198,70 +2178,154 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
);
}
return redis_transaction_success;
}
}
TAILQ_HEAD(serial_rule_q, serial_rule_t);
struct Maat_command_batch
{
struct Maat_cmd_line line_cmd;
const struct Maat_cmd_line *p=NULL;
char line[MAX_TABLE_LINE_SIZE];
serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=table_name;
line_cmd.rule_id=compile->config_id;
line_cmd.table_line=line;
line_cmd.label_id=label_id;
line_cmd.expire_after=expire_after;
p=&line_cmd;
int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op);
{
int batch_size;
serial_rule_q queue;
struct _Maat_feather_t * feather;
long long server_time;
};
struct Maat_command_batch* Maat_command_batch_new(Maat_feather_t feather)
{
struct Maat_command_batch* batch=ALLOC(struct Maat_command_batch, 1);
TAILQ_INIT(&batch->queue);
batch->feather=(struct _Maat_feather_t *)feather;
redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
if(write_ctx==NULL)
{
free(batch);
return NULL;
}
batch->server_time=redis_server_time(write_ctx);
if(!batch->server_time)
{
free(batch);
return NULL;
}
return batch;
}
}
int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
{
struct Maat_cmd_line line_cmd;
const struct Maat_cmd_line *p=NULL;
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
char line[MAX_TABLE_LINE_SIZE];
serialize_region(region, group_id, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=region->table_name;
line_cmd.rule_id=region->region_id;
line_cmd.table_line=line;
p=&line_cmd;
int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op);
return ret;
}
int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
{
struct Maat_cmd_line line_cmd;
serialize_region(region, group_id, line, sizeof(line));
set_serial_rule(s_rule, op, region->region_id, 0, region->table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
return 0;
serialize_group2compile(op, g2c, line, sizeof(line));
assert(g2c->group_id<1024*1024);
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=g2c->table_name;
line_cmd.rule_id=g2c->compile_id*1024*1024+g2c->group_id;
line_cmd.table_line=line;
p=&line_cmd;
int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op);
}
}
int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
{
struct Maat_cmd_line line_cmd;
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
assert(g2g->group_id<1024*1024);
serialize_group2group(op, g2g, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=g2g->table_name;
line_cmd.rule_id=g2g->superior_group_id*1024*1024+g2g->group_id;
line_cmd.table_line=line;
p=&line_cmd;
int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op);
serialize_group2group(op, g2g, line, sizeof(line));
set_serial_rule(s_rule, op, g2g->superior_group_id*1024*1024+g2g->group_id, 0, g2g->table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
return 0;
}
int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
serialize_group2compile(op, g2c, line, sizeof(line));
assert(g2c->group_id<1024*1024);
set_serial_rule(s_rule, op, g2c->compile_id*1024*1024+g2c->group_id, 0, g2c->table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
return 0;
}
int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
{
struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1);
long long absolute_expire_time=0;
char line[MAX_TABLE_LINE_SIZE];
serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line));
if(expire_after>0)
{
absolute_expire_time=batch->server_time+expire_after;
}
set_serial_rule(s_rule, op, compile->config_id, label_id, table_name,
line, absolute_expire_time);
TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries);
batch->batch_size++;
return 0;
}
int Maat_command_batch_commit(struct Maat_command_batch* batch)
{
struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size);
int i=0;
redisContext* write_ctx=get_redis_ctx_for_write(batch->feather);
struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue);
while(tmp != NULL)
{
TAILQ_REMOVE(&batch->queue, tmp, entries);
memcpy(s_rule_array+i, tmp, sizeof(*tmp));
free(tmp);
tmp = TAILQ_FIRST(&batch->queue);
i++;
}
assert(i==batch->batch_size);
exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger);
for(i=0; i<batch->batch_size; i++)
{
empty_serial_rules(s_rule_array+i);
}
free(s_rule_array);
free(batch);
return i;
}
int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
{
struct Maat_command_batch* batch=NULL;
batch=Maat_command_batch_new(feather);
Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after);
Maat_command_batch_commit(batch);
return 0;
}
int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
{
struct Maat_command_batch* batch=NULL;
batch=Maat_command_batch_new(feather);
Maat_command_batch_set_region(batch, op, region, group_id);
Maat_command_batch_commit(batch);
return 0;
}
int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
{
struct Maat_command_batch* batch=NULL;
batch=Maat_command_batch_new(feather);
Maat_command_batch_set_group2compile(batch, op, g2c);
Maat_command_batch_commit(batch);
return 0;
}
int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
{
struct Maat_command_batch* batch=NULL;
batch=Maat_command_batch_new(feather);
Maat_command_batch_set_group2group(batch, op, g2g);
Maat_command_batch_commit(batch);
return 0;
}