diff --git a/inc/Maat_command.h b/inc/Maat_command.h index 7032fde..9c4ee5c 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -120,17 +120,6 @@ struct Maat_cmd_region struct Maat_rgn_sim_t similarity_rule; }; }; - - -struct Maat_cmd_t -{ - //This Struct MUST alloced by Maat_create_cmd(), then released by Maat_free_cmd(). - struct Maat_rule_t compile; // for MAAT_OP_DEL, only compile.config_id is necessary. - int group_num; // for MAAT_OP_DEL, set to 0. - int expire_after; //expired after $expire_after$ seconds, set to 0 for never timeout. - int label_id; //>0, to be indexed and quried by Maat_cmd_select; =0 not index - struct Maat_group_t* groups;// Add regions with Maat_add_region2cmd -}; struct Maat_cmd_line { const char* table_name; @@ -187,6 +176,14 @@ int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATI //@param label_id: bigger than 0 means this compile rule is to be indexed and quried by Maat_cmd_select; =0 not index int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after); +struct Maat_command_batch; +struct Maat_command_batch* Maat_command_batch_new(Maat_feather_t feather); + +int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id); +int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g); +int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c); +int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after); +int Maat_command_batch_commit(struct Maat_command_batch* batch); int Maat_cmd_get_new_group_id(Maat_feather_t feather); int Maat_cmd_get_new_region_id(Maat_feather_t feather); diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 7d4042a..12659d7 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -27,15 +27,7 @@ const char* foreign_source_prefix="redis://"; const char* foreign_key_prefix="__FILE_"; -struct _Maat_cmd_inner_t -{ - struct Maat_cmd_t cmd; - enum MAAT_OPERATION op; - int ref_cnt; - int region_size[MAX_EXPR_ITEM_NUM]; - char* huge_service_defined; //max to 4KB - struct _Maat_cmd_inner_t* next; -}; + int _wrap_redisGetReply(redisContext *c, redisReply **reply) { return redisGetReply(c, (void **)reply); @@ -1808,9 +1800,9 @@ void _maat_empty_region(struct Maat_region_t* p) return; } -int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, int line_num ,enum MAAT_OPERATION op) +int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op) { - int i=0; + size_t i=0; _Maat_feather_t* _feather=(_Maat_feather_t*)feather; int ret=0, table_id=0,success_cnt=0; struct serial_rule_t *s_rule=NULL; @@ -1825,8 +1817,8 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l { return -1; } - s_rule=ALLOC(struct serial_rule_t, line_num); - for(i=0;itable_mgr, line_rule[i]->table_name); if(table_id<0) @@ -1849,8 +1841,8 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name, line_rule[i]->table_line, absolute_expire_time); } - success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger); - if(success_cnt<0||success_cnt!=line_num)//error + success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger); + if(success_cnt<0||(size_t)success_cnt!=n_line)//error { ret=-1; goto error_out; @@ -1859,7 +1851,7 @@ int Maat_cmd_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** l _feather->line_cmd_acc_num+=success_cnt; error_out: - for(i=0;iconfig_id; - line_cmd.table_line=line; - line_cmd.label_id=label_id; - line_cmd.expire_after=expire_after; - p=&line_cmd; - int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op); - return ret; + int batch_size; + serial_rule_q queue; + struct _Maat_feather_t * feather; + long long server_time; +}; +struct Maat_command_batch* Maat_command_batch_new(Maat_feather_t feather) +{ + struct Maat_command_batch* batch=ALLOC(struct Maat_command_batch, 1); + TAILQ_INIT(&batch->queue); + batch->feather=(struct _Maat_feather_t *)feather; + redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); + if(write_ctx==NULL) + { + free(batch); + return NULL; + } + batch->server_time=redis_server_time(write_ctx); + if(!batch->server_time) + { + free(batch); + return NULL; + } + return batch; } -int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) + +int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) { - struct Maat_cmd_line line_cmd; - const struct Maat_cmd_line *p=NULL; - + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; char line[MAX_TABLE_LINE_SIZE]; - serialize_region(region, group_id, line, sizeof(line)); - memset(&line_cmd, 0, sizeof(line_cmd)); - line_cmd.table_name=region->table_name; - line_cmd.rule_id=region->region_id; - line_cmd.table_line=line; - p=&line_cmd; - int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op); - return ret; -} -int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) -{ - struct Maat_cmd_line line_cmd; - const struct Maat_cmd_line *p=NULL; - char line[MAX_TABLE_LINE_SIZE]; + set_serial_rule(s_rule, op, region->region_id, 0, region->table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; - serialize_group2compile(op, g2c, line, sizeof(line)); - assert(g2c->group_id<1024*1024); - memset(&line_cmd, 0, sizeof(line_cmd)); - line_cmd.table_name=g2c->table_name; - line_cmd.rule_id=g2c->compile_id*1024*1024+g2c->group_id; - line_cmd.table_line=line; - p=&line_cmd; - int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op); - return ret; } -int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) +int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) { - struct Maat_cmd_line line_cmd; - const struct Maat_cmd_line *p=NULL; + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; char line[MAX_TABLE_LINE_SIZE]; assert(g2g->group_id<1024*1024); serialize_group2group(op, g2g, line, sizeof(line)); - memset(&line_cmd, 0, sizeof(line_cmd)); - line_cmd.table_name=g2g->table_name; - line_cmd.rule_id=g2g->superior_group_id*1024*1024+g2g->group_id; - line_cmd.table_line=line; - p=&line_cmd; - int ret=Maat_cmd_raw_set_lines(feather, &p, 1, op); - return ret; + + set_serial_rule(s_rule, op, g2g->superior_group_id*1024*1024+g2g->group_id, 0, g2g->table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) +{ + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + + serialize_group2compile(op, g2c, line, sizeof(line)); + assert(g2c->group_id<1024*1024); + + set_serial_rule(s_rule, op, g2c->compile_id*1024*1024+g2c->group_id, 0, g2c->table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) +{ + + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line)); + + if(expire_after>0) + { + absolute_expire_time=batch->server_time+expire_after; + } + set_serial_rule(s_rule, op, compile->config_id, label_id, table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_commit(struct Maat_command_batch* batch) +{ + struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size); + int i=0; + redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); + struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue); + while(tmp != NULL) + { + TAILQ_REMOVE(&batch->queue, tmp, entries); + memcpy(s_rule_array+i, tmp, sizeof(*tmp)); + free(tmp); + tmp = TAILQ_FIRST(&batch->queue); + i++; + } + assert(i==batch->batch_size); + exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger); + for(i=0; ibatch_size; i++) + { + empty_serial_rules(s_rule_array+i); + } + free(s_rule_array); + free(batch); + return i; +} + +int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_region(batch, op, region, group_id); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_group2compile(batch, op, g2c); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) +{ + + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_group2group(batch, op, g2g); + Maat_command_batch_commit(batch); + return 0; } int Maat_cmd_flushDB(Maat_feather_t feather) diff --git a/src/inc_internal/Maat_rule_internal.h b/src/inc_internal/Maat_rule_internal.h index 2cc2ec0..a547fab 100644 --- a/src/inc_internal/Maat_rule_internal.h +++ b/src/inc_internal/Maat_rule_internal.h @@ -356,6 +356,7 @@ struct serial_rule_t //rm= Redis Maat char* table_line; int n_foreign; struct foreign_key* f_keys; + TAILQ_ENTRY(serial_rule_t) entries; }; int parse_accept_tag(const char* value, struct rule_tag** result, void* logger); diff --git a/test/perf_test_maatframe.cpp b/test/perf_test_maatframe.cpp index 372032e..68c21ac 100644 --- a/test/perf_test_maatframe.cpp +++ b/test/perf_test_maatframe.cpp @@ -5,19 +5,21 @@ #include int test_add_expr_command_copy(Maat_feather_t feather,const char* region_table,int config_id, int timeout,int label_id, const char* keywords) { - struct Maat_cmd_t* cmd=NULL; + struct Maat_cmd_group2compile g2c; + struct Maat_cmd_group2group g2g; struct Maat_rule_t rule; - char huge_serv_def[1024*2]; - memset(huge_serv_def,'s',sizeof(huge_serv_def)); - struct Maat_region_t region; + struct Maat_cmd_region region; int group_num=1,ret=0; memset(&rule,0,sizeof(rule)); rule.config_id=config_id; strcpy(rule.service_defined,"maat_command"); - //MUST acqire by function, because Maat_cmd_t has some hidden members. - cmd=Maat_create_cmd(&rule, group_num); - cmd->expire_after=timeout; - cmd->label_id=label_id; + + Maat_command_raw_set_compile(feather, MAAT_OP_ADD, &rule, "COMPILE", NULL, 1, timeout, label_id); + memset(g2c, 0, sizeof(g2c)); + g2c.group_id=config_id; + g2c.compile_id=config_id; + g2c.table_name="GROUP2COMPILE"; + Maat_command_raw_set_group2compile(feather, MAAT_OP_ADD, &g2c); memset(®ion,0,sizeof(region)); region.region_type=REGION_EXPR; region.table_name=region_table; @@ -26,18 +28,7 @@ int test_add_expr_command_copy(Maat_feather_t feather,const char* region_table,i region.expr_rule.expr_type=EXPR_TYPE_AND; region.expr_rule.match_method=MATCH_METHOD_SUB; region.expr_rule.hex_bin=UNCASE_PLAIN; - Maat_cmd_set_opt(cmd, MAAT_RULE_SERV_DEFINE, huge_serv_def, sizeof(huge_serv_def)); - Maat_add_region2cmd(cmd, 0, ®ion); - //use pipeline model. - ret=Maat_cmd_append(feather, cmd, MAAT_OP_ADD); - if(ret<0) - { - printf("Add Maat command %d failed.\n",rule.config_id); - Maat_free_cmd(cmd); - return 0; - } - //cmd has been saved in feather, so free cmd before commit is allowed. - Maat_free_cmd(cmd); + Maat_command_raw_set_region(feather, MAAT_OP_ADD, ®ion, g2c->group_id); return 0; } @@ -128,15 +119,9 @@ TEST_F(MaatCMDPerfTest, SetExpr200K) { test_add_expr_command_copy(feather,table_name,config_id-i, 0, label_id, keywords); } - ret=Maat_cmd_commit(feather); - EXPECT_TRUE(ret>=0); - wait_for_cmd_effective_copy(feather, version_before); - struct Maat_cmd_t* cmd=NULL; struct Maat_rule_t rule; memset(&rule,0,sizeof(rule)); - int *output_ids=(int*)malloc(sizeof(int)*CMD_EXPR_NUM); - output_id_cnt=Maat_cmd_select(feather,label_id, output_ids, CMD_EXPR_NUM); - EXPECT_EQ(output_id_cnt, CMD_EXPR_NUM); + for(i=0; i