diff --git a/inc/Maat_command.h b/inc/Maat_command.h index 9c4ee5c..16bac35 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -185,8 +185,8 @@ int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after); int Maat_command_batch_commit(struct Maat_command_batch* batch); -int Maat_cmd_get_new_group_id(Maat_feather_t feather); -int Maat_cmd_get_new_region_id(Maat_feather_t feather); +int Maat_command_get_new_group_id(Maat_feather_t feather); +int Maat_command_get_new_region_id(Maat_feather_t feather); #ifdef __cplusplus } //end extern"C" diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 12659d7..fbdcf77 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -97,7 +97,7 @@ int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger) return 0; } } -redisContext* get_redis_ctx_for_write(_Maat_feather_t * feather) +redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather) { int ret=0; if(feather->mr_ctx.write_ctx==NULL) @@ -361,7 +361,7 @@ void empty_serial_rules(struct serial_rule_t* rule) memset(rule,0,sizeof(struct serial_rule_t)); return; } -void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout) +void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout) { memset(rule, 0, sizeof(struct serial_rule_t)); rule->op=op; @@ -430,7 +430,7 @@ int get_inc_key_list(long long instance_version, long long target_version, redis for(i=0, j=0;ielements;i++) { assert(reply->element[i]->type==REDIS_REPLY_STRING); - ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%ld",op_str,s_rule[j].table_name,&(s_rule[j].rule_id)); + ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id)); if(ret!=3||s_rule[i].rule_id<0) { MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, @@ -989,7 +989,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s switch(s_rule[i].op) { case MAAT_OP_ADD: - redisAppendCommand(ctx,"SET %s:%s,%d %s", + redisAppendCommand(ctx,"SET %s:%s,%lu %s", mr_key_prefix[MAAT_OP_ADD], s_rule[i].table_name, s_rule[i].rule_id, @@ -998,7 +998,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s (*cnt)++; append_cmd_cnt++; //Allowing add duplicated members for rule id recycling. - redisAppendCommand(ctx,"RPUSH %s ADD,%s,%d", + redisAppendCommand(ctx,"RPUSH %s ADD,%s,%lu", transaction_list, s_rule[i].table_name, s_rule[i].rule_id); @@ -1007,7 +1007,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s append_cmd_cnt++; if(s_rule[i].timeout>0) { - redisAppendCommand(ctx,"ZADD %s %lld %s,%d", + redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", mr_expire_sset, s_rule[i].timeout, s_rule[i].table_name, @@ -1019,7 +1019,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s } if(s_rule[i].label_id>0) { - redisAppendCommand(ctx,"ZADD %s %d %s,%d", + redisAppendCommand(ctx,"ZADD %s %d %s,%lu", mr_label_sset, s_rule[i].label_id, s_rule[i].table_name, @@ -1033,7 +1033,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s } break; case MAAT_OP_DEL: - redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d", + redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu", mr_key_prefix[MAAT_OP_ADD], s_rule[i].table_name, s_rule[i].rule_id, @@ -1045,7 +1045,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s (*cnt)++; append_cmd_cnt++; - redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d", + redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d", mr_key_prefix[MAAT_OP_DEL], s_rule[i].table_name, s_rule[i].rule_id, @@ -1055,7 +1055,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s append_cmd_cnt++; //NX: Don't update already exisiting elements. Always add new elements. - redisAppendCommand(ctx,"RPUSH %s DEL,%s,%d", + redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu", transaction_list, s_rule[i].table_name, s_rule[i].rule_id); @@ -1064,7 +1064,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s append_cmd_cnt++; // Try to remove from expiration sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%d", + redisAppendCommand(ctx,"ZREM %s %s,%lu", mr_expire_sset, s_rule[i].table_name, s_rule[i].rule_id); @@ -1073,7 +1073,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s append_cmd_cnt++; // Try to remove from label sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%d", + redisAppendCommand(ctx,"ZREM %s %s,%lu", mr_label_sset, s_rule[i].table_name, s_rule[i].rule_id); @@ -1087,7 +1087,7 @@ void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct s continue; } //s_rule[i].timeout>0 was checked by caller. - redisAppendCommand(ctx,"ZADD %s %lld %s,%d", + redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", mr_expire_sset, s_rule[i].timeout, s_rule[i].table_name, @@ -2051,13 +2051,13 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) freeReplyObject(data_reply); return result; } -int Maat_cmd_get_new_group_id(Maat_feather_t feather) +int Maat_command_get_new_group_id(Maat_feather_t feather) { int group_id=0; group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1); return group_id; } -int Maat_cmd_get_new_region_id(Maat_feather_t feather) +int Maat_command_get_new_region_id(Maat_feather_t feather) { int region_id=0; region_id=(int) Maat_cmd_incrby(feather, mr_region_id_var, 1); @@ -2212,6 +2212,7 @@ int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_O struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); long long absolute_expire_time=0; char line[MAX_TABLE_LINE_SIZE]; + serialize_region(region, group_id, line, sizeof(line)); set_serial_rule(s_rule, op, region->region_id, 0, region->table_name, @@ -2221,17 +2222,19 @@ int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_O return 0; } +#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id) + int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) { struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); long long absolute_expire_time=0; char line[MAX_TABLE_LINE_SIZE]; - assert(g2g->group_id<1024*1024); serialize_group2group(op, g2g, line, sizeof(line)); - set_serial_rule(s_rule, op, g2g->superior_group_id*1024*1024+g2g->group_id, 0, g2g->table_name, + set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name, line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); batch->batch_size++; return 0; @@ -2242,10 +2245,8 @@ int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum long long absolute_expire_time=0; char line[MAX_TABLE_LINE_SIZE]; - serialize_group2compile(op, g2c, line, sizeof(line)); - assert(g2c->group_id<1024*1024); - - set_serial_rule(s_rule, op, g2c->compile_id*1024*1024+g2c->group_id, 0, g2c->table_name, + serialize_group2compile(op, g2c, line, sizeof(line)); + set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name, line, absolute_expire_time); TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); batch->batch_size++; @@ -2264,7 +2265,8 @@ int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_O absolute_expire_time=batch->server_time+expire_after; } set_serial_rule(s_rule, op, compile->config_id, label_id, table_name, - line, absolute_expire_time); + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); batch->batch_size++; return 0; @@ -2275,10 +2277,11 @@ int Maat_command_batch_commit(struct Maat_command_batch* batch) int i=0; redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue); + while(tmp != NULL) { TAILQ_REMOVE(&batch->queue, tmp, entries); - memcpy(s_rule_array+i, tmp, sizeof(*tmp)); + memcpy(s_rule_array+i, tmp, sizeof(*tmp)); free(tmp); tmp = TAILQ_FIRST(&batch->queue); i++; diff --git a/src/entry/json2iris.cpp b/src/entry/json2iris.cpp index cbba8bc..101660e 100644 --- a/src/entry/json2iris.cpp +++ b/src/entry/json2iris.cpp @@ -1200,7 +1200,7 @@ int write_iris(cJSON *json, struct iris_description_t *p_iris, void* logger) "compile rule %d have no groups.",compile_id); return -1; } - i=1; + i=0; cJSON_ArrayForEach(group_obj, group_array) { ret=write_group_rule(group_obj, compile_id, PARENT_TYPE_COMPILE, compile_id, i, p_iris, logger); diff --git a/src/inc_internal/Maat_rule_internal.h b/src/inc_internal/Maat_rule_internal.h index a547fab..74c3690 100644 --- a/src/inc_internal/Maat_rule_internal.h +++ b/src/inc_internal/Maat_rule_internal.h @@ -348,7 +348,7 @@ struct foreign_key struct serial_rule_t //rm= Redis Maat { enum MAAT_OPERATION op;//0: delete, 1: add. - long rule_id; + unsigned long rule_id; int label_id; char with_error; long long timeout; // absolute unix time. @@ -386,7 +386,7 @@ MAAT_RULE_EX_DATA rule_ex_data_new(const struct Maat_rule_head * rule_head, cons void rule_ex_data_free(const struct Maat_rule_head * rule_head, const char* srv_def, MAAT_RULE_EX_DATA *ad, const struct compile_ex_data_idx* ex_desc); -void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout); +void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout); void empty_serial_rules(struct serial_rule_t* rule); int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,unsigned int serial_rule_num, long long server_time, void* logger); long long redis_server_time(redisContext* ctx); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7fb80b9..c57ae7b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -7,8 +7,8 @@ target_link_libraries(test_igraph igraph-static) add_executable(test_maatframe test_maatframe.cpp) target_link_libraries(test_maatframe maat_frame_shared gtest) -#add_executable(perf_test_maatframe perf_test_maatframe.cpp) -#target_link_libraries(perf_test_maatframe maat_frame_shared gtest) +add_executable(perf_test_maatframe perf_test_maatframe.cpp) +target_link_libraries(perf_test_maatframe maat_frame_shared gtest) configure_file(table_info.conf table_info.conf COPYONLY) configure_file(file_test_tableinfo.conf file_test_tableinfo.conf COPYONLY) diff --git a/test/perf_test_maatframe.cpp b/test/perf_test_maatframe.cpp index 68c21ac..444c31b 100644 --- a/test/perf_test_maatframe.cpp +++ b/test/perf_test_maatframe.cpp @@ -3,24 +3,93 @@ #include "Maat_command.h" #include #include -int test_add_expr_command_copy(Maat_feather_t feather,const char* region_table,int config_id, int timeout,int label_id, const char* keywords) +#include +#include + +void ipv4_addr_set_copy(struct ipaddr *ipv4_addr, struct stream_tuple4_v4* v4_addr, + const char* src_ip, unsigned short sport, const char* dest_ip, unsigned short dport) +{ + ipv4_addr->addrtype=ADDR_TYPE_IPV4; + inet_pton(AF_INET, src_ip, &(v4_addr->saddr)); + v4_addr->source=htons(sport); + inet_pton(AF_INET, dest_ip, &(v4_addr->daddr)); + v4_addr->dest=htons(dport); + ipv4_addr->v4=v4_addr; + return; +} + + +void random_keyword_generate(char* keyword_buf, size_t sz) +{ +#define MIN_KEYWORD_LEN 4 + size_t i=0, len=0; + len=random()%(sz-1-MIN_KEYWORD_LEN)+MIN_KEYWORD_LEN; + for(i=0; igroup_id); + Maat_command_batch_set_region(batch, MAAT_OP_ADD, ®ion, g2c.group_id); return 0; - } void wait_for_cmd_effective_copy(Maat_feather_t feather, long long version_before) { @@ -63,7 +131,7 @@ protected: int scan_interval_ms=500; int effective_interval_ms=0; - logger=MESA_create_runtime_log_handle("test_maat_redis.log",0); + logger=MESA_create_runtime_log_handle("maat_perf_test.log",0); _shared_feather=Maat_feather(g_iThreadNum, table_info_path, logger); Maat_set_feather_opt(_shared_feather,MAAT_OPT_INSTANCE_NAME,"perf", strlen("perf")+1); @@ -96,83 +164,62 @@ void* MaatCMDPerfTest::logger; //Following tests must be coded/tested at last, for they stalled the maat update thread and interrupt other tests. TEST_F(MaatCMDPerfTest, SetExpr200K) { - const int CMD_EXPR_NUM=200*1000; - const char* table_name="HTTP_URL"; + const int CMD_EXPR_NUM=1*1000*1000; + const int CMD_IP_NUM=1*1000*1000; + const char* expr_table_name="HTTP_URL"; + const char* ip_table_name="IP_CONFIG"; - const char* keywords1="Hiredis"; - const char* keywords2="C Client"; - char escape_buff1[256],escape_buff2[256]; - char keywords[256]; - - int label_id=5210, config_id=0,ret=0, output_id_cnt=0; + char keyword_buf[128]; + char src_ip_buff[32], dst_ip_buff[32]; + + int config_id=0, ret=0; Maat_feather_t feather=MaatCMDPerfTest::_shared_feather; long long version_before=0; - ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before)); - - Maat_str_escape(escape_buff1, sizeof(escape_buff1),keywords1); - Maat_str_escape(escape_buff2, sizeof(escape_buff2),keywords2); - snprintf(keywords,sizeof(keywords),"%s&%s",escape_buff1,escape_buff2); - - config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", CMD_EXPR_NUM); + ret=Maat_read_state(feather, MAAT_STATE_VERSION, &version_before, sizeof(version_before)); + + config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", CMD_EXPR_NUM+CMD_IP_NUM); int i=0; + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); for(i=0; i