From 17b537b19ad41326f1f0fc6fffcee2d04ef14405 Mon Sep 17 00:00:00 2001 From: yangwei Date: Thu, 19 Sep 2024 15:58:39 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=AA=20test(mq):=20add=20priority=20tes?= =?UTF-8?q?t=20case?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/mq/mq.c | 4 +- infra/mq/mq_internal.h | 1 + infra/mq/test/gtest_mq_main.cpp | 221 +++++++++++++++----------------- 3 files changed, 106 insertions(+), 120 deletions(-) diff --git a/infra/mq/mq.c b/infra/mq/mq.c index 76ae081..c4b315c 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -195,7 +195,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms return 0; } -static int mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority) +int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority) { if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; @@ -212,7 +212,7 @@ static int mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) { - return mq_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM); + return mq_runtime_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM); } struct mq_schema *mq_schema_new() diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index bbcbb0c..c246894 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -66,6 +66,7 @@ struct mq_runtime struct mq_message *dealth_letter_queue;// dlq list }; +int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority); #ifdef __cplusplus } diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp index c09dd96..423b41d 100644 --- a/infra/mq/test/gtest_mq_main.cpp +++ b/infra/mq/test/gtest_mq_main.cpp @@ -4,6 +4,8 @@ #include "mq/mq_internal.h" +#include "stellar/utils.h" + #define TOPIC_NAME_MAX 512 /******************************************* @@ -358,8 +360,8 @@ TEST(mq_runtime, sub_with_dispatch_cb) { EXPECT_EQ(env.sess_dispatch_called, env.N_session); EXPECT_EQ(env.sess_msg_free_called, env.N_session); } - #if 0 +//TODO: test case mq for overlimit static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; @@ -406,7 +408,6 @@ static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) } return; } -//TODO: test case mq for overlimit TEST(mq_runtime, packet_plugins_pub_overlimit) { @@ -479,160 +480,144 @@ TEST(mq_runtime, packet_plugins_pub_overlimit) { } -//TODO: test case, mq priority +#endif + +struct test_priority_mq_env +{ + struct mq_schema *s; + struct mq_runtime *rt; + int N_round; + int current_round; + int tcp_topic_id; + int test_mq_topic_id; + int plugin_id_1_called; + int plugin_id_2_called; + +}; //test dettach session -static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env) +static void test_session_mq_priority_plugin_1_on_msg(int topic_id, void *msg, void *plugin_env) { - struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env; env->plugin_id_1_called+=1; - if(topic_id == env->intrinsc_tcp_topic_id) + if(topic_id == env->tcp_topic_id) { - struct session *sess = (struct session *)msg; - struct test_session_called_ctx *ctx = - (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id); - if (ctx == NULL) - { - ctx = CALLOC(struct test_session_called_ctx, 1); - session_exdata_set(sess, env->exdata_ctx_1_id, ctx); - } - ctx->called+=1; - EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority - EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); + EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority + if((long)msg%2==0) + { + EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0); + } + else + { + EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0); + } } if(topic_id == env->test_mq_topic_id) { - if(ctx->called%3 == 2) - { - EXPECT_EQ((int)(long)msg, env->plugin_id_2); - } - if(ctx->called%3 == 0) - { - EXPECT_EQ((int )(long)msg, env->plugin_id_1); - } - } + if (env->current_round % 2 == 0) + { + if (env->plugin_id_1_called % 3 == 2) + { + EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority + } + if (env->plugin_id_1_called % 3 == 0) + { + EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority + } + } + else + { + if (env->plugin_id_1_called % 3 == 2) + { + EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority + } + if (env->plugin_id_1_called % 3 == 0) + { + EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority + } + } + } return; } -static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env) +static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env) { - struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env; env->plugin_id_2_called+=1; - - if(topic_id == env->intrinsc_tcp_topic_id) + if(topic_id == env->tcp_topic_id) { - struct session *sess = (struct session *)msg; - struct test_session_called_ctx *ctx = - (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id); - if (ctx == NULL) - { - ctx = CALLOC(struct test_session_called_ctx, 1); - session_exdata_set(sess, env->exdata_ctx_2_id, ctx); - } - ctx->called+=1; - EXPECT_EQ(ctx->called % 3, 1); - // publish msg has normal priority - EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0); + EXPECT_EQ(env->plugin_id_2_called % 3, 1); + // tcp msg has normal priority + EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0); } if(topic_id == env->test_mq_topic_id) { - if(ctx->called%3 == 2) - { - EXPECT_EQ((int)(long)msg, env->plugin_id_2); - } - if(ctx->called%3 == 0) - { - EXPECT_EQ((int)(long)msg, env->plugin_id_1); - } - } + if (env->current_round % 2 == 0) + { + if (env->plugin_id_2_called % 3 == 2) + { + EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority + } + if (env->plugin_id_2_called % 3 == 0) + { + EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority + } + } + else + { + if (env->plugin_id_2_called % 3 == 2) + { + EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority + } + if (env->plugin_id_2_called % 3 == 0) + { + EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority + } + } + } return; } -TEST(mq_runtime, test_session_mq_priority) { +TEST(mq_runtime, basic_mq_priority) { - struct stellar st={0}; - struct session_plugin_env env; - memset(&env, 0, sizeof(struct session_plugin_env)); + struct test_priority_mq_env env={}; -// mock init stage - struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); - whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + env.s=mq_schema_new(); -// plugin manager register plugin + env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env); + EXPECT_GE(env.tcp_topic_id, 0); + + EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); + EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); - int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env); - EXPECT_GE(plugin_id_1,0); - - int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env); - EXPECT_GE(plugin_id_2,0); - - env.plugin_id_1=plugin_id_1; - env.plugin_id_2=plugin_id_2; - - env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ; - env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ; - - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); - EXPECT_GE(env.intrinsc_tcp_topic_id, 0); - EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); - EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); - - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); + env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); - EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); - EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); + EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); // mock packet and session - env.plug_mgr=plug_mgr; - env.N_per_session_pkt_cnt=10; - env.N_session=10; + env.rt=mq_runtime_new(env.s); + env.N_round=10; - struct packet pkt={&st, TCP, 6}; - - struct session sess[env.N_session]; - memset(&sess, 0, sizeof(sess)); - -// mock running stage - for(int i=0; i < env.N_session; i++) - { - sess[i].state=SESSION_STATE_OPENING; - sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); - sess[i].type=SESSION_TYPE_TCP; - } - - for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + for (int i = 0; i < env.N_round; i++) { - plugin_manager_on_packet_input(plug_mgr, &pkt); + env.current_round=i; + mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i); + mq_runtime_dispatch(env.rt); - for (int i = 0; i < env.N_session; i++) - { - sess[i].sess_pkt_cnt+=1; - sess[i].state=SESSION_STATE_ACTIVE; - stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]); - } - - plugin_manager_on_packet_output(plug_mgr, &pkt); } - for(int i=0; i < env.N_session; i++) - { - sess[i].state=SESSION_STATE_CLOSING; - session_exdata_runtime_free(sess[i].session_exdat_rt); - } - -// mock exit stage - plugin_manager_exit(plug_mgr); - - // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1) - EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); - EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); + mq_runtime_free(env.rt); + mq_schema_free(env.s); + // publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2 + EXPECT_EQ(env.plugin_id_1_called,env.N_round*3); + EXPECT_EQ(env.plugin_id_2_called,env.N_round*3); } -#endif /********************************************** * GTEST MAIN *