✨ feat(mq): mq_runtime_clean in mq.h
This commit is contained in:
@@ -256,13 +256,135 @@ TEST(mq_runtime, basic_pub_sub) {
|
||||
mq_runtime_dispatch(rt);
|
||||
}
|
||||
|
||||
mq_runtime_free(rt);
|
||||
mq_schema_free(s);
|
||||
|
||||
EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
|
||||
|
||||
mq_runtime_free(rt);
|
||||
mq_schema_free(s);
|
||||
|
||||
}
|
||||
|
||||
struct test_pub_on_free_env
|
||||
{
|
||||
mq_schema *s;
|
||||
mq_runtime *rt;
|
||||
int N_round;
|
||||
int current_round;
|
||||
int topic_id;
|
||||
int on_msg_free_called;
|
||||
int on_msg_called;
|
||||
};
|
||||
|
||||
|
||||
static void test_pub_on_msg_free(void *msg, void *msg_free_arg)
|
||||
{
|
||||
struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)msg_free_arg;
|
||||
env->on_msg_free_called+=1;
|
||||
if(env->current_round==env->N_round-1 && (long)msg!=env->N_round)
|
||||
{
|
||||
EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void *)(long)(env->N_round)), -1);//on message free, publish always failed
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static void test_pub_on_free_on_msg(int topic_id, void *msg, void *sub_arg)
|
||||
{
|
||||
struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)sub_arg;
|
||||
env->on_msg_called+=1;
|
||||
if((int)(long)msg==env->N_round)
|
||||
{
|
||||
EXPECT_EQ(env->on_msg_called, env->N_round+1);
|
||||
EXPECT_EQ(env->current_round, env->N_round-1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TEST(mq_runtime, pub_on_msg_free)
|
||||
{
|
||||
struct test_pub_on_free_env env={};
|
||||
env.s=mq_schema_new();
|
||||
EXPECT_TRUE(env.s!=NULL);
|
||||
env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_on_msg_free , &env);
|
||||
EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_on_free_on_msg, &env), 0);
|
||||
|
||||
env.N_round=10;
|
||||
env.rt=mq_runtime_new(env.s);
|
||||
EXPECT_TRUE(env.rt!=NULL);
|
||||
for(int i=0; i<env.N_round;i++)
|
||||
{
|
||||
env.current_round=i;
|
||||
EXPECT_EQ(mq_runtime_publish_message(env.rt, env.topic_id, (void *)(long)i), 0);
|
||||
mq_runtime_dispatch(env.rt);
|
||||
}
|
||||
|
||||
mq_runtime_free(env.rt);
|
||||
mq_schema_free(env.s);
|
||||
|
||||
EXPECT_EQ(env.on_msg_free_called, env.N_round);
|
||||
EXPECT_EQ(env.on_msg_called, env.N_round);
|
||||
}
|
||||
|
||||
|
||||
struct test_dispatch_on_msg_env
|
||||
{
|
||||
mq_schema *s;
|
||||
mq_runtime *rt;
|
||||
int N_round;
|
||||
int current_round;
|
||||
int topic_id;
|
||||
int on_msg_free_called;
|
||||
int on_msg_called;
|
||||
};
|
||||
|
||||
|
||||
static void test_dispatch_on_msg_free(void *msg, void *msg_free_arg)
|
||||
{
|
||||
struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)msg_free_arg;
|
||||
env->on_msg_free_called+=1;
|
||||
return;
|
||||
}
|
||||
|
||||
static void test_dispatch_on_msg(int topic_id, void *msg, void *sub_arg)
|
||||
{
|
||||
struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)sub_arg;
|
||||
env->on_msg_called+=1;
|
||||
if(env->current_round==(long)msg)
|
||||
{
|
||||
EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void*)(long)env->N_round), 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
EXPECT_EQ((long)msg, env->N_round);
|
||||
}
|
||||
mq_runtime_dispatch(env->rt);
|
||||
return;
|
||||
}
|
||||
|
||||
TEST(mq_runtime, call_dispatch_on_msg_dispatch)
|
||||
{
|
||||
struct test_dispatch_on_msg_env env={};
|
||||
env.s=mq_schema_new();
|
||||
EXPECT_TRUE(env.s!=NULL);
|
||||
env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_dispatch_on_msg_free , &env);
|
||||
EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_dispatch_on_msg, &env), 0);
|
||||
|
||||
env.N_round=10;
|
||||
env.rt=mq_runtime_new(env.s);
|
||||
EXPECT_TRUE(env.rt!=NULL);
|
||||
for(int i=0; i<env.N_round;i++)
|
||||
{
|
||||
env.current_round=i;
|
||||
EXPECT_EQ(mq_runtime_publish_message(env.rt, env.topic_id, (void *)(long)i), 0);
|
||||
mq_runtime_dispatch(env.rt);
|
||||
}
|
||||
|
||||
mq_runtime_free(env.rt);
|
||||
mq_schema_free(env.s);
|
||||
|
||||
EXPECT_EQ(env.on_msg_free_called, env.N_round*2);
|
||||
EXPECT_EQ(env.on_msg_called, env.N_round*2);
|
||||
}
|
||||
|
||||
/**********************************************
|
||||
@@ -360,127 +482,6 @@ TEST(mq_runtime, sub_with_dispatch_cb) {
|
||||
EXPECT_EQ(env.sess_dispatch_called, env.N_session);
|
||||
EXPECT_EQ(env.sess_msg_free_called, env.N_session);
|
||||
}
|
||||
#if 0
|
||||
//TODO: test case mq for overlimit
|
||||
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
|
||||
env->msg_free_cnt+=1;
|
||||
FREE(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
|
||||
EXPECT_TRUE(env!=NULL);
|
||||
env->msg_sub_cnt+=1;
|
||||
return;
|
||||
}
|
||||
|
||||
static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
|
||||
EXPECT_TRUE(env!=NULL);
|
||||
//EXPECT_EQ(pkt->ip_proto, ip_protocol);
|
||||
int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
|
||||
unsigned int cnt=0;
|
||||
int *msg;
|
||||
for(int i=0; i<topic_id_num; i++)
|
||||
{
|
||||
for(unsigned int j=0; j < env->plug_mgr->max_message_dispatch; j++)
|
||||
{
|
||||
msg=CALLOC(int, 1);
|
||||
*msg=cnt;
|
||||
int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg);
|
||||
if(cnt < env->plug_mgr->max_message_dispatch)
|
||||
{
|
||||
ASSERT_EQ(pub_ret, 0);
|
||||
env->msg_pub_cnt+=1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ASSERT_EQ(pub_ret, -1);
|
||||
}
|
||||
if(pub_ret!=0)FREE(msg);
|
||||
cnt+=1;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TEST(mq_runtime, packet_plugins_pub_overlimit) {
|
||||
|
||||
struct stellar st={0};
|
||||
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
|
||||
whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
|
||||
|
||||
unsigned char ip_proto=6;
|
||||
struct packet_plugin_env env;
|
||||
memset(&env, 0, sizeof(struct packet_plugin_env));
|
||||
env.plug_mgr=plug_mgr;
|
||||
char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
|
||||
|
||||
int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
|
||||
|
||||
for(int i=0; i<topic_id_num; i++)
|
||||
{
|
||||
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
|
||||
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
|
||||
EXPECT_GE(env.packet_topic_id[i], 0);
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
|
||||
plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
|
||||
EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
|
||||
EXPECT_EQ(topic->free_cb_arg, &env);
|
||||
EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
|
||||
EXPECT_STREQ(topic->topic_name, topic_name[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
||||
}
|
||||
|
||||
int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env);
|
||||
EXPECT_GE(pub_plugin_id, 0);
|
||||
|
||||
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
||||
|
||||
for (int i = 0; i < topic_sub_num; i++)
|
||||
{
|
||||
env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok
|
||||
EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
|
||||
for(int j = 0; j < topic_id_num; j++)
|
||||
{
|
||||
EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
|
||||
}
|
||||
|
||||
struct packet pkt={&st, IPv4, ip_proto};
|
||||
|
||||
int N_packet=10;
|
||||
for (int i = 0; i < N_packet; i++)
|
||||
{
|
||||
plugin_manager_on_packet_input(plug_mgr, &pkt);
|
||||
plugin_manager_on_packet_output(plug_mgr, &pkt);
|
||||
}
|
||||
|
||||
plugin_manager_exit(plug_mgr);
|
||||
EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
struct test_priority_mq_env
|
||||
{
|
||||
@@ -617,6 +618,127 @@ TEST(mq_runtime, basic_mq_priority) {
|
||||
EXPECT_EQ(env.plugin_id_2_called,env.N_round*3);
|
||||
}
|
||||
|
||||
#if 0
|
||||
//TODO: test case mq for overlimit
|
||||
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
|
||||
env->msg_free_cnt+=1;
|
||||
FREE(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
|
||||
EXPECT_TRUE(env!=NULL);
|
||||
env->msg_sub_cnt+=1;
|
||||
return;
|
||||
}
|
||||
|
||||
static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
|
||||
{
|
||||
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
|
||||
EXPECT_TRUE(env!=NULL);
|
||||
//EXPECT_EQ(pkt->ip_proto, ip_protocol);
|
||||
int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
|
||||
unsigned int cnt=0;
|
||||
int *msg;
|
||||
for(int i=0; i<topic_id_num; i++)
|
||||
{
|
||||
for(unsigned int j=0; j < env->plug_mgr->max_message_dispatch; j++)
|
||||
{
|
||||
msg=CALLOC(int, 1);
|
||||
*msg=cnt;
|
||||
int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg);
|
||||
if(cnt < env->plug_mgr->max_message_dispatch)
|
||||
{
|
||||
ASSERT_EQ(pub_ret, 0);
|
||||
env->msg_pub_cnt+=1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ASSERT_EQ(pub_ret, -1);
|
||||
}
|
||||
if(pub_ret!=0)FREE(msg);
|
||||
cnt+=1;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
TEST(mq_runtime, pub_overlimit) {
|
||||
|
||||
struct stellar st={0};
|
||||
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
|
||||
whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
|
||||
|
||||
unsigned char ip_proto=6;
|
||||
struct packet_plugin_env env;
|
||||
memset(&env, 0, sizeof(struct packet_plugin_env));
|
||||
env.plug_mgr=plug_mgr;
|
||||
char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
|
||||
|
||||
int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
|
||||
|
||||
for(int i=0; i<topic_id_num; i++)
|
||||
{
|
||||
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
|
||||
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
|
||||
EXPECT_GE(env.packet_topic_id[i], 0);
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
|
||||
plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
|
||||
EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
|
||||
EXPECT_EQ(topic->free_cb_arg, &env);
|
||||
EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
|
||||
EXPECT_STREQ(topic->topic_name, topic_name[i]);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
||||
}
|
||||
|
||||
int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env);
|
||||
EXPECT_GE(pub_plugin_id, 0);
|
||||
|
||||
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
||||
|
||||
for (int i = 0; i < topic_sub_num; i++)
|
||||
{
|
||||
env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok
|
||||
EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
|
||||
for(int j = 0; j < topic_id_num; j++)
|
||||
{
|
||||
EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
SCOPED_TRACE("White-box test, check stellar internal schema");
|
||||
EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
|
||||
}
|
||||
|
||||
struct packet pkt={&st, IPv4, ip_proto};
|
||||
|
||||
int N_packet=10;
|
||||
for (int i = 0; i < N_packet; i++)
|
||||
{
|
||||
plugin_manager_on_packet_input(plug_mgr, &pkt);
|
||||
plugin_manager_on_packet_output(plug_mgr, &pkt);
|
||||
}
|
||||
|
||||
plugin_manager_exit(plug_mgr);
|
||||
EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
|
||||
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/**********************************************
|
||||
|
||||
Reference in New Issue
Block a user