测试一次写入20万lines和rule的性能。

This commit is contained in:
zhengchao
2018-12-06 21:11:51 +06:00
parent 953393b448
commit a92e7b4253
7 changed files with 224 additions and 105 deletions

View File

@@ -165,8 +165,9 @@ enum MAAT_INIT_OPT
int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const void* value,int size);
enum MAAT_STATE_OPT
{
MAAT_STATE_VERSION=1, //Get current maat version. VALUE is long long, SIZE=sizeof(long long).
MAAT_STATE_LAST_UPDATING_TABLE //Query at Maat_finish_callback_t to determine whether this table is the last one to update. VALUE is interger, SIZE=sizeof(int), 1:yes, 0: no
MAAT_STATE_VERSION=1, //Get current maat version, if maat is in update progress, the updating version is returned. VALUE is long long, SIZE=sizeof(long long).
MAAT_STATE_LAST_UPDATING_TABLE, //Query at Maat_finish_callback_t to determine whether this table is the last one to update. VALUE is interger, SIZE=sizeof(int), 1:yes, 0: no
MAAT_STATE_IN_UPDATING
};
int Maat_read_state(Maat_feather_t feather, enum MAAT_STATE_OPT type, void* value, int size);

View File

@@ -2260,6 +2260,20 @@ int Maat_read_state(Maat_feather_t feather,enum MAAT_STATE_OPT type, void* valu
case MAAT_STATE_LAST_UPDATING_TABLE:
*int_val=_feather->is_last_plugin_table_updating;
break;
case MAAT_STATE_IN_UPDATING:
if(size!=sizeof(int))
{
return -1;
}
if(0==pthread_mutex_trylock(&(_feather->backgroud_update_mutex)))
{
*int_val=0;
pthread_mutex_unlock(&(_feather->backgroud_update_mutex));
}
else
{
*int_val=1;
}
default:
return -1;
break;

View File

@@ -1213,21 +1213,21 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_
return;
}
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger)
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule, unsigned int serial_rule_num, long long server_time, void* logger)
{
int max_redis_batch=1*1024,batch_cnt=0;
int success_cnt=0,j=0,renew_allowed=0,last_failed=-1;
unsigned int max_redis_batch=1*1024, batch_cnt=0;
int renew_allowed=0,last_failed=-1;
redisReply*transaction_reply=NULL,*p=NULL;
unsigned int i=0;
unsigned int multi_cmd_cnt=0;
unsigned int multi_cmd_cnt=0, success_cnt=0;
const int MAX_REDIS_OP_PER_SRULE=8;
unsigned int max_multi_cmd_num=MAX_REDIS_OP_PER_SRULE*serial_rule_num+2;// 2 for operation in _exec_serial_rule_end()
struct expected_reply_t *expected_reply=(struct expected_reply_t*)calloc(sizeof(struct expected_reply_t), max_multi_cmd_num);
long long new_version=0;
int renew_num=0,ret=0;
for(i=0;i<(unsigned int)serial_rule_num;i++)
for(i=0;i<serial_rule_num;i++)
{
if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT)
{
@@ -1243,7 +1243,7 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
}
while(success_cnt<serial_rule_num)
{
batch_cnt=MIN(serial_rule_num-success_cnt,max_redis_batch);
batch_cnt=MIN(serial_rule_num-success_cnt, max_redis_batch);
_exec_serial_rule(ctx,new_version,s_rule+success_cnt,batch_cnt,expected_reply, &multi_cmd_cnt,success_cnt,renew_allowed);
assert(multi_cmd_cnt<max_multi_cmd_num);
success_cnt+=batch_cnt;
@@ -1262,11 +1262,11 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
{
continue;
}
assert(j<serial_rule_num);
assert(i<(unsigned int)serial_rule_num);
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command
,"%s %s %d failed, rule id maybe conflict or not exist."
,rm_op_str[s_rule[j].op]
,s_rule[j].table_name,s_rule[j].rule_id);
,rm_op_str[s_rule[i].op]
,s_rule[i].table_name,s_rule[i].rule_id);
success_cnt--;
last_failed=expected_reply[i].srule_seq;
}

View File

@@ -468,7 +468,7 @@ int plugin_EX_data_free(const struct Maat_table_desc* plugin_table, const char*
void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout);
void empty_serial_rules(struct serial_rule_t* rule);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,unsigned int serial_rule_num, long long server_time, void* logger);
long long redis_server_time(redisContext* ctx);
redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger);
char* md5_file(const char* filename, char* md5string);

View File

@@ -675,35 +675,36 @@ int test_table_conjunction(Maat_feather_t feather,const char* table_name,const c
}
return 0;
}
#define TEST_CMD_LINE_NUM 4
#define TEST_CMD_LINE_NUM 200*1000
void test_set_cmd_line(Maat_feather_t feather)
{
const struct Maat_line_t *p_line[TEST_CMD_LINE_NUM];
struct Maat_line_t line_rule[TEST_CMD_LINE_NUM];
char table_line[TEST_CMD_LINE_NUM][128];
struct Maat_line_t **p_line=(struct Maat_line_t **)calloc(sizeof(struct Maat_line_t *), TEST_CMD_LINE_NUM);
struct Maat_line_t *line_rule=(struct Maat_line_t *)calloc(sizeof(struct Maat_line_t), TEST_CMD_LINE_NUM);
int i=0;
const char* line="1\t192.168.0.1\t4444444444\t1";
memset(&line_rule,0,sizeof(line_rule));
for(i=0;i<TEST_CMD_LINE_NUM;i++)
{
line_rule[i].label_id=0;
line_rule[i].rule_id=(int)Maat_cmd_incrby(feather,"TEST_PLUG_SEQ", 1);
line_rule[i].table_name="QD_ENTRY_INFO";
snprintf(table_line[i],sizeof(table_line[i]),"1\t192.168.0.1\t%d\t1",100+i);
line_rule[i].table_line=table_line[i];
line_rule[i].table_line=line;
// asprintf(&(line_rule[i].table_line),"1\t192.168.0.1\t%d\t1",100+i);
line_rule[i].expire_after=0;
p_line[i]=line_rule+i;
}
Maat_cmd_set_lines(feather, p_line,TEST_CMD_LINE_NUM, MAAT_OP_ADD);
Maat_cmd_set_lines(feather,(const struct Maat_line_t **)p_line,TEST_CMD_LINE_NUM, MAAT_OP_ADD);
usleep(WAIT_FOR_EFFECTIVE_US);
for(i=0;i<TEST_CMD_LINE_NUM;i++)
{
line_rule[i].table_line=NULL;
}
Maat_cmd_set_lines(feather, p_line,TEST_CMD_LINE_NUM, MAAT_OP_DEL);
Maat_cmd_set_lines(feather, (const struct Maat_line_t **)p_line,TEST_CMD_LINE_NUM, MAAT_OP_DEL);
free(p_line);
free(line_rule);
return;
}

View File

@@ -38,6 +38,19 @@ int g_iThreadNum=4;
const char* table_info_path="./table_info.conf";
int scan_interval_ms=1;
int effective_interval_ms=0;
void wait_for_cmd_effective(Maat_feather_t feather, long long version_before)
{
long long version_after=version_before;
int is_updating=1;
while(is_updating||version_before==version_after)
{
Maat_read_state(feather,MAAT_STATE_IN_UPDATING, &is_updating, sizeof(is_updating));
Maat_read_state(feather,MAAT_STATE_VERSION, &version_after, sizeof(version_after));
usleep(1000*100);//waiting for commands go into effect
}
}
void scan_with_old_or_new_cfg(Maat_feather_t feather, int hit_old)
{
@@ -1033,6 +1046,95 @@ int del_command(Maat_feather_t feather,int config_id)
Maat_free_cmd(cmd);
return 0;
}
TEST_F(MaatCmdTest, SetIP)
{
struct Maat_cmd_t* cmd=NULL;
struct Maat_rule_t rule;
int config_id=0,timeout=4;
long long version_before=0;
const char* region_table="IP_CONFIG";
struct Maat_region_t region;
int group_num=1,ret=0;
memset(&rule,0,sizeof(rule));
Maat_feather_t feather=MaatCmdTest::_shared_feather;
//MUST acquire by Maat_cmd_incrby to guarantee a unique compile ID.
config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", 1);
rule.config_id=config_id;
strcpy(rule.service_defined,"maat_command");
//MUST acqire by function, because Maat_cmd_t has some hidden members.
cmd=Maat_create_cmd(&rule, group_num);
cmd->expire_after=timeout;
cmd->label_id=0; //no lable
memset(&region,0,sizeof(region));
region.region_type=REGION_IP;
region.table_name=region_table;
region.ip_rule.addr_type=ADDR_TYPE_IPv4;
region.ip_rule.direction=ADDR_DIR_DOUBLE;
region.ip_rule.src_ip="172.0.0.1";
region.ip_rule.mask_src_ip="255.255.255.255";
region.ip_rule.src_port=53331;
region.ip_rule.mask_src_port=0;//means any port should hit.
region.ip_rule.dst_ip="172.0.0.2";
region.ip_rule.mask_dst_ip="255.255.255.255";
region.ip_rule.dst_port=80;
region.ip_rule.mask_dst_port=65535;
region.ip_rule.protocol=0;//means any protocol should hit.
Maat_add_region2cmd(cmd, 0, &region);
ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before));
EXPECT_EQ(ret, 0);
ret=Maat_cmd(feather, cmd, MAAT_OP_ADD);
EXPECT_GE(ret, 0);
Maat_free_cmd(cmd);
cmd=NULL;
wait_for_cmd_effective(feather, version_before);
struct ipaddr ipv4_addr;
struct stream_tuple4_v4 v4_addr;
ipv4_addr.addrtype=ADDR_TYPE_IPV4;
inet_pton(AF_INET,region.ip_rule.src_ip,&(v4_addr.saddr));
v4_addr.source=htons(region.ip_rule.src_port+1);//Not use the exactly port for testing port mask.
inet_pton(AF_INET,region.ip_rule.dst_ip,&(v4_addr.daddr));
v4_addr.dest=htons(region.ip_rule.dst_port);
ipv4_addr.v4=&v4_addr;
int table_id=0;
struct Maat_rule_t result;
memset(&result, 0, sizeof(result));
scan_status_t mid=NULL;
table_id=Maat_table_register(feather,region_table);
ASSERT_GE(table_id, 0);
ret=Maat_scan_proto_addr(feather,table_id,&ipv4_addr,6,&result,1, &mid,0);
EXPECT_EQ(ret, 1);
EXPECT_EQ(result.config_id, config_id);
Maat_clean_status(&mid);
ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before));
//reset timeout.
cmd=Maat_create_cmd(&rule, 0);
cmd->expire_after=10;
ret=Maat_cmd(feather, cmd, MAAT_OP_RENEW_TIMEOUT);
EXPECT_EQ(ret ,1);
wait_for_cmd_effective(feather, version_before);
Maat_free_cmd(cmd);
cmd=NULL;
ret=Maat_scan_proto_addr(feather,table_id,&ipv4_addr,6,&result,1, &mid,0);
EXPECT_EQ(ret, 1);
Maat_clean_status(&mid);
return;
}
TEST_F(MaatCmdTest, SetExpr)
{
@@ -1107,6 +1209,56 @@ TEST_F(MaatCmdTest, SetExpr)
&mid, 0);
EXPECT_EQ(ret, 0);
}
TEST_F(MaatCmdTest, SetExpr20w)
{
const int CMD_EXPR_NUM=200*1000;
const char* table_name="HTTP_URL";
const char* keywords1="Hiredis";
const char* keywords2="C Client";
char escape_buff1[256],escape_buff2[256];
char keywords[256];
int label_id=5210, config_id=0,ret=0, output_id_cnt=0;
Maat_feather_t feather=MaatCmdTest::_shared_feather;
long long version_before=0;
ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before));
Maat_str_escape(escape_buff1, sizeof(escape_buff1),keywords1);
Maat_str_escape(escape_buff2, sizeof(escape_buff2),keywords2);
snprintf(keywords,sizeof(keywords),"%s&%s",escape_buff1,escape_buff2);
config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", CMD_EXPR_NUM);
int i=0;
for(i=0; i<CMD_EXPR_NUM;i++)
{
test_add_expr_command(feather,table_name,config_id-i, 0, label_id, keywords);
}
ret=Maat_cmd_commit(feather);
EXPECT_TRUE(ret>=0);
wait_for_cmd_effective(feather, version_before);
struct Maat_cmd_t* cmd=NULL;
struct Maat_rule_t rule;
memset(&rule,0,sizeof(rule));
int *output_ids=(int*)malloc(sizeof(int)*CMD_EXPR_NUM);
output_id_cnt=Maat_cmd_select(feather,label_id, output_ids, CMD_EXPR_NUM);
EXPECT_EQ(output_id_cnt, CMD_EXPR_NUM);
for(i=0; i<CMD_EXPR_NUM;i++)
{
memset(&rule,0,sizeof(rule));
rule.config_id=output_ids[i];
cmd=Maat_create_cmd(&rule, 0);
ret=Maat_cmd_append(feather, cmd, MAAT_OP_DEL);
EXPECT_EQ(ret, 0);
Maat_free_cmd(cmd);
}
ret=Maat_cmd_commit(feather);
EXPECT_EQ(ret, CMD_EXPR_NUM);
free(output_ids);
}
TEST_F(MaatCmdTest, SetLines)
{
const int TEST_CMD_LINE_NUM=4;
@@ -1142,95 +1294,46 @@ TEST_F(MaatCmdTest, SetLines)
return;
}
TEST_F(MaatCmdTest, SetIP)
TEST_F(MaatCmdTest, SetLines20w)
{
struct Maat_cmd_t* cmd=NULL;
struct Maat_rule_t rule;
int config_id=0,timeout=4;
long long version_before=0,version_after=0;
const char* region_table="IP_CONFIG";
struct Maat_region_t region;
int group_num=1,ret=0;
memset(&rule,0,sizeof(rule));
const int TEST_CMD_LINE_NUM=200*1000;
Maat_feather_t feather=MaatCmdTest::_shared_feather;
//MUST acquire by Maat_cmd_incrby to guarantee a unique compile ID.
config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", 1);
rule.config_id=config_id;
struct Maat_line_t **p_line=(struct Maat_line_t **)calloc(sizeof(struct Maat_line_t *), TEST_CMD_LINE_NUM);
struct Maat_line_t *line_rule=(struct Maat_line_t *)calloc(sizeof(struct Maat_line_t), TEST_CMD_LINE_NUM);
int i=0;
const char* line="1\t192.168.0.1\t4444444444\t1";
int seq=(int)Maat_cmd_incrby(feather,"TEST_PLUG_SEQ", TEST_CMD_LINE_NUM);
for(i=0;i<TEST_CMD_LINE_NUM;i++)
{
line_rule[i].label_id=0;
line_rule[i].rule_id=seq-i;
line_rule[i].table_name="QD_ENTRY_INFO";
line_rule[i].table_line=line;
// asprintf(&(line_rule[i].table_line),"1\t192.168.0.1\t%d\t1",100+i);
line_rule[i].expire_after=0;
p_line[i]=line_rule+i;
}
strcpy(rule.service_defined,"maat_command");
//MUST acqire by function, because Maat_cmd_t has some hidden members.
cmd=Maat_create_cmd(&rule, group_num);
cmd->expire_after=timeout;
cmd->label_id=0; //no lable
memset(&region,0,sizeof(region));
region.region_type=REGION_IP;
region.table_name=region_table;
region.ip_rule.addr_type=ADDR_TYPE_IPv4;
region.ip_rule.direction=ADDR_DIR_DOUBLE;
region.ip_rule.src_ip="172.0.0.1";
region.ip_rule.mask_src_ip="255.255.255.255";
region.ip_rule.src_port=53331;
region.ip_rule.mask_src_port=0;//means any port should hit.
long long version_before=0;
Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before));
region.ip_rule.dst_ip="172.0.0.2";
region.ip_rule.mask_dst_ip="255.255.255.255";
region.ip_rule.dst_port=80;
region.ip_rule.mask_dst_port=65535;
region.ip_rule.protocol=0;//means any protocol should hit.
Maat_add_region2cmd(cmd, 0, &region);
Maat_cmd_set_lines(feather,(const struct Maat_line_t **)p_line,TEST_CMD_LINE_NUM, MAAT_OP_ADD);
ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_before, sizeof(version_before));
EXPECT_EQ(ret, 0);
ret=Maat_cmd(feather, cmd, MAAT_OP_ADD);
EXPECT_GE(ret, 0);
Maat_free_cmd(cmd);
cmd=NULL;
wait_for_cmd_effective(feather, version_before);
//TEST if the command go into effective.
usleep(WAIT_FOR_EFFECTIVE_US); //waiting for commands go into effect
ret=Maat_read_state(feather,MAAT_STATE_VERSION, &version_after, sizeof(version_after));
struct ipaddr ipv4_addr;
struct stream_tuple4_v4 v4_addr;
ipv4_addr.addrtype=ADDR_TYPE_IPV4;
inet_pton(AF_INET,region.ip_rule.src_ip,&(v4_addr.saddr));
v4_addr.source=htons(region.ip_rule.src_port+1);//Not use the exactly port for testing port mask.
inet_pton(AF_INET,region.ip_rule.dst_ip,&(v4_addr.daddr));
v4_addr.dest=htons(region.ip_rule.dst_port);
ipv4_addr.v4=&v4_addr;
int table_id=0;
struct Maat_rule_t result;
memset(&result, 0, sizeof(result));
scan_status_t mid=NULL;
table_id=Maat_table_register(feather,region_table);
ASSERT_GE(table_id, 0);
ret=Maat_scan_proto_addr(feather,table_id,&ipv4_addr,6,&result,1, &mid,0);
EXPECT_EQ(ret, 1);
EXPECT_EQ(result.config_id, config_id);
Maat_clean_status(&mid);
//reset timeout.
cmd=Maat_create_cmd(&rule, 0);
cmd->expire_after=10;
ret=Maat_cmd(feather, cmd, MAAT_OP_RENEW_TIMEOUT);
EXPECT_EQ(ret ,1);
usleep(2*1000*1000+WAIT_FOR_EFFECTIVE_US);//wait for commands expired.
Maat_free_cmd(cmd);
cmd=NULL;
ret=Maat_scan_proto_addr(feather,table_id,&ipv4_addr,6,&result,1, &mid,0);
EXPECT_EQ(ret, 1);
Maat_clean_status(&mid);
for(i=0;i<TEST_CMD_LINE_NUM;i++)
{
line_rule[i].table_line=NULL;
}
Maat_cmd_set_lines(feather, (const struct Maat_line_t **)p_line,TEST_CMD_LINE_NUM, MAAT_OP_DEL);
free(p_line);
free(line_rule);
return;
}
int g_test_update_paused=0;
void pause_update_test_entry_cb(int table_id,const char* table_line,void* u_para)
{

View File

@@ -173,7 +173,7 @@ clean_up:
}
int count_line_num(const char* table_name,const char* line,void *u_para)
{
(*((int *)u_para))++;
(*((unsigned int *)u_para))++;
return 0;
}
int line_idx=0;
@@ -221,7 +221,7 @@ int make_serial_rule(const char* table_name,const char* line,void *u_para)
#define WORK_MODE_JSON 1
int main(int argc, char * argv[])
{
int oc=0,ret=0, i=0,success_cnt=0;
int oc=0,ret=0;
int model=0;
char redis_ip[64];
int redis_port=6379;
@@ -230,7 +230,7 @@ int main(int argc, char * argv[])
char dump_dir[128], json_file[128], tmp_iris_path[128];
strncpy(dump_dir,redis_dump_dir,sizeof(dump_dir));
redisContext * ctx=NULL;
int total_line_cnt=0;
unsigned int total_line_cnt=0, success_cnt=0, i=0;
int timeout=0;
FILE* json_fp=NULL;
cJSON *json=NULL, *tmp_obj=NULL;