From 7305c64a8cfe66f37c335457cf4fbbd8a9e95e6a Mon Sep 17 00:00:00 2001 From: yangwei Date: Sat, 14 Sep 2024 16:44:36 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=AA=20test(mq):=20add=20dispatch=5Fcb?= =?UTF-8?q?=20test=20case?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/mq/test/gtest_mq_main.cpp | 96 +++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp index 3b9dc77..23d93f2 100644 --- a/infra/mq/test/gtest_mq_main.cpp +++ b/infra/mq/test/gtest_mq_main.cpp @@ -301,6 +301,102 @@ TEST(plugin_manager, basic_pub_sub) { mq_schema_free(s); } +/********************************************** + * MQ RUNTIME WITH DISPATCH * + **********************************************/ + +struct session +{ + int id; + int cnt; +}; + +struct session_mq_test_env +{ + struct mq_schema *s; + struct mq_runtime *rt; + int N_session; + struct session sess[1024]; + int intrinsc_tcp_input_topic_id; + int basic_on_tcp_called; + int sess_dispatch_called; + int test_mq_sub_called; + int sess_msg_free_called; +}; + +#define TOPIC_TCP "TCP" +typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *module_ctx); + +static void pesudo_on_msg_dispatch(int topic_id, + void *msg, + on_msg_cb_func* on_msg_cb, + void *on_msg_cb_arg, + void *dispatch_arg) +{ + on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb; + struct session *sess=(struct session *)msg; + EXPECT_TRUE(dispatch_arg==NULL); + session_cb(topic_id, sess, on_msg_cb_arg); + struct session_mq_test_env *env=(struct session_mq_test_env *)on_msg_cb_arg; + env->sess_dispatch_called+=1; +} + +static void pesudo_tcp_session_msg_free(void *msg, void *msg_free_arg) +{ + struct session_mq_test_env *env=(struct session_mq_test_env *)msg_free_arg; + env->sess_msg_free_called+=1; + +} +static int pesudo_tcp_session_subscribe(struct session_mq_test_env *env, on_session_msg_cb_func *on_session_cb) +{ + int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP); + if(topic_id<0) + { + topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, pesudo_tcp_session_msg_free, env); + } + return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env); +} + +static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env) +{ + struct session_mq_test_env *env = (struct session_mq_test_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + if(sess) + { + env->basic_on_tcp_called+=1; + } + return; +} + +TEST(mq_runtime, sub_with_dispatch_cb) { + + struct session_mq_test_env env; + memset(&env, 0, sizeof(env)); + env.N_session=10; + + env.s=mq_schema_new(); + + EXPECT_EQ(pesudo_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0); + env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP); + + env.rt=mq_runtime_new(env.s); + + for(int i=0; i