🧪 test(mq): add priority test case

This commit is contained in:
yangwei
2024-09-19 15:58:39 +08:00
parent 6af61355e3
commit 17b537b19a
3 changed files with 106 additions and 120 deletions

View File

@@ -4,6 +4,8 @@
#include "mq/mq_internal.h"
#include "stellar/utils.h"
#define TOPIC_NAME_MAX 512
/*******************************************
@@ -358,8 +360,8 @@ TEST(mq_runtime, sub_with_dispatch_cb) {
EXPECT_EQ(env.sess_dispatch_called, env.N_session);
EXPECT_EQ(env.sess_msg_free_called, env.N_session);
}
#if 0
//TODO: test case mq for overlimit
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
@@ -406,7 +408,6 @@ static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
}
return;
}
//TODO: test case mq for overlimit
TEST(mq_runtime, packet_plugins_pub_overlimit) {
@@ -479,160 +480,144 @@ TEST(mq_runtime, packet_plugins_pub_overlimit) {
}
//TODO: test case, mq priority
#endif
struct test_priority_mq_env
{
struct mq_schema *s;
struct mq_runtime *rt;
int N_round;
int current_round;
int tcp_topic_id;
int test_mq_topic_id;
int plugin_id_1_called;
int plugin_id_2_called;
};
//test dettach session
static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env)
static void test_session_mq_priority_plugin_1_on_msg(int topic_id, void *msg, void *plugin_env)
{
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
env->plugin_id_1_called+=1;
if(topic_id == env->intrinsc_tcp_topic_id)
if(topic_id == env->tcp_topic_id)
{
struct session *sess = (struct session *)msg;
struct test_session_called_ctx *ctx =
(struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id);
if (ctx == NULL)
{
ctx = CALLOC(struct test_session_called_ctx, 1);
session_exdata_set(sess, env->exdata_ctx_1_id, ctx);
}
ctx->called+=1;
EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority
EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0);
EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority
if((long)msg%2==0)
{
EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0);
}
else
{
EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0);
}
}
if(topic_id == env->test_mq_topic_id)
{
if(ctx->called%3 == 2)
{
EXPECT_EQ((int)(long)msg, env->plugin_id_2);
}
if(ctx->called%3 == 0)
{
EXPECT_EQ((int )(long)msg, env->plugin_id_1);
}
}
if (env->current_round % 2 == 0)
{
if (env->plugin_id_1_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
}
if (env->plugin_id_1_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
}
}
else
{
if (env->plugin_id_1_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
}
if (env->plugin_id_1_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
}
}
}
return;
}
static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env)
static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env)
{
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
env->plugin_id_2_called+=1;
if(topic_id == env->intrinsc_tcp_topic_id)
if(topic_id == env->tcp_topic_id)
{
struct session *sess = (struct session *)msg;
struct test_session_called_ctx *ctx =
(struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id);
if (ctx == NULL)
{
ctx = CALLOC(struct test_session_called_ctx, 1);
session_exdata_set(sess, env->exdata_ctx_2_id, ctx);
}
ctx->called+=1;
EXPECT_EQ(ctx->called % 3, 1);
// publish msg has normal priority
EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0);
EXPECT_EQ(env->plugin_id_2_called % 3, 1);
// tcp msg has normal priority
EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0);
}
if(topic_id == env->test_mq_topic_id)
{
if(ctx->called%3 == 2)
{
EXPECT_EQ((int)(long)msg, env->plugin_id_2);
}
if(ctx->called%3 == 0)
{
EXPECT_EQ((int)(long)msg, env->plugin_id_1);
}
}
if (env->current_round % 2 == 0)
{
if (env->plugin_id_2_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
}
if (env->plugin_id_2_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
}
}
else
{
if (env->plugin_id_2_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
}
if (env->plugin_id_2_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
}
}
}
return;
}
TEST(mq_runtime, test_session_mq_priority) {
TEST(mq_runtime, basic_mq_priority) {
struct stellar st={0};
struct session_plugin_env env;
memset(&env, 0, sizeof(struct session_plugin_env));
struct test_priority_mq_env env={};
// mock init stage
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
env.s=mq_schema_new();
// plugin manager register plugin
env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env);
EXPECT_GE(env.tcp_topic_id, 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env);
EXPECT_GE(plugin_id_1,0);
int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(plugin_id_2,0);
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ;
env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ;
env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
// mock packet and session
env.plug_mgr=plug_mgr;
env.N_per_session_pkt_cnt=10;
env.N_session=10;
env.rt=mq_runtime_new(env.s);
env.N_round=10;
struct packet pkt={&st, TCP, 6};
struct session sess[env.N_session];
memset(&sess, 0, sizeof(sess));
// mock running stage
for(int i=0; i < env.N_session; i++)
{
sess[i].state=SESSION_STATE_OPENING;
sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
sess[i].type=SESSION_TYPE_TCP;
}
for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
for (int i = 0; i < env.N_round; i++)
{
plugin_manager_on_packet_input(plug_mgr, &pkt);
env.current_round=i;
mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i);
mq_runtime_dispatch(env.rt);
for (int i = 0; i < env.N_session; i++)
{
sess[i].sess_pkt_cnt+=1;
sess[i].state=SESSION_STATE_ACTIVE;
stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]);
}
plugin_manager_on_packet_output(plug_mgr, &pkt);
}
for(int i=0; i < env.N_session; i++)
{
sess[i].state=SESSION_STATE_CLOSING;
session_exdata_runtime_free(sess[i].session_exdat_rt);
}
// mock exit stage
plugin_manager_exit(plug_mgr);
// each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
mq_runtime_free(env.rt);
mq_schema_free(env.s);
// publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2
EXPECT_EQ(env.plugin_id_1_called,env.N_round*3);
EXPECT_EQ(env.plugin_id_2_called,env.N_round*3);
}
#endif
/**********************************************
* GTEST MAIN *