From 364f8c84a355bcd2425781149258fbcd326d95e3 Mon Sep 17 00:00:00 2001 From: yangwei Date: Fri, 13 Sep 2024 19:04:59 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A7=AA=20test(mq=20test=20case):=20add=20?= =?UTF-8?q?test=20case?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/mq/CMakeLists.txt | 2 +- infra/mq/mq.c | 8 +- infra/mq/{mq_interna.h => mq_internal.h} | 0 infra/mq/test/CMakeLists.txt | 14 + infra/mq/test/gtest_mq_main.cpp | 1619 ++++++++++++++++++++++ 5 files changed, 1640 insertions(+), 3 deletions(-) rename infra/mq/{mq_interna.h => mq_internal.h} (100%) create mode 100644 infra/mq/test/CMakeLists.txt create mode 100644 infra/mq/test/gtest_mq_main.cpp diff --git a/infra/mq/CMakeLists.txt b/infra/mq/CMakeLists.txt index 727073a..655fe76 100644 --- a/infra/mq/CMakeLists.txt +++ b/infra/mq/CMakeLists.txt @@ -1,3 +1,3 @@ add_library(mq mq.c) -#add_subdirectory(test) \ No newline at end of file +add_subdirectory(test) \ No newline at end of file diff --git a/infra/mq/mq.c b/infra/mq/mq.c index 408bfb4..ddc598b 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -1,4 +1,4 @@ -#include "mq_interna.h" +#include "mq_internal.h" #include "stellar/utils.h" #include "uthash/utlist.h" @@ -226,6 +226,10 @@ void mq_schema_free(struct mq_schema *s) if(s==NULL)return; if(s->topic_array) { + for (unsigned int i = 0; i < utarray_len(s->topic_array); i++) + { + mq_schema_destroy_topic(s, i); + } utarray_free(s->topic_array); } FREE(s); @@ -234,7 +238,7 @@ void mq_schema_free(struct mq_schema *s) struct mq_runtime *mq_runtime_new(struct mq_schema *s) { - if(s==NULL || s->topic_array==NULL)return NULL; + if(s==NULL)return NULL; struct mq_runtime *rt = CALLOC(struct mq_runtime,1); rt->schema=s; return rt; diff --git a/infra/mq/mq_interna.h b/infra/mq/mq_internal.h similarity index 100% rename from infra/mq/mq_interna.h rename to infra/mq/mq_internal.h diff --git a/infra/mq/test/CMakeLists.txt b/infra/mq/test/CMakeLists.txt new file mode 100644 index 0000000..6b72d4e --- /dev/null +++ b/infra/mq/test/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(gtest_mq gtest_mq_main.cpp) + +target_include_directories(gtest_mq PRIVATE ${CMAKE_SOURCE_DIR}/infra/) + +target_link_libraries( + gtest_mq + mq + "-rdynamic" + gtest + gmock +) + +include(GoogleTest) +gtest_discover_tests(gtest_mq) \ No newline at end of file diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp new file mode 100644 index 0000000..9830a33 --- /dev/null +++ b/infra/mq/test/gtest_mq_main.cpp @@ -0,0 +1,1619 @@ +#pragma GCC diagnostic ignored "-Wunused-parameter" + +#include + +#include "mq/mq_internal.h" + +#define TOPIC_NAME_MAX 512 + +#if 0 +void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + SCOPED_TRACE("whitebox test intrisic metadata"); + + EXPECT_TRUE(plug_mgr!=NULL); + + EXPECT_EQ(plug_mgr->st, st); + + //load spec null + EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL); + + //session exdata schema null + EXPECT_TRUE(plug_mgr->exdata_schema!=NULL); + + //stellar mq schema null + EXPECT_TRUE(plug_mgr->stellar_mq_schema_array==NULL); + + //registered plugin array null + EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); + EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); + + EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); + int thread_num=stellar_get_worker_thread_num(st); + for(int i=0; iper_thread_data[i].exdata_array==NULL); + EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); + for(int j=0; jper_thread_data[i].priority_mq[j]==NULL); + } +} + +#endif + +TEST(mq_schema, new_and_free) { + + struct mq_schema *s = mq_schema_new(); + EXPECT_TRUE(s!=NULL); + mq_schema_free(s); +} + +TEST(mq_runtime, new_and_free) { + + struct mq_schema *s = mq_schema_new(); + EXPECT_TRUE(s!=NULL); + mq_schema_free(s); + + struct mq_runtime *rt = mq_runtime_new(s); + EXPECT_TRUE(rt!=NULL); + mq_runtime_free(rt); + +} + +void mock_msg_free(void *msg, void *msg_free_arg){} +void mock_overwrite_msg_free(void *msg, void *msg_free_arg){} + +TEST(mq_schema, mq_topic_create_and_update) { + + struct mq_schema *s = mq_schema_new(); + EXPECT_TRUE(s!=NULL); + + const char *topic_name="PACKET_TOPIC"; + EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name + + int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); + EXPECT_GE(topic_id, 0); + + struct mq_topic *topic = NULL; + { + SCOPED_TRACE("White-box test, check mq_schema internal "); + topic = + (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); + EXPECT_EQ(topic->free_cb_arg, s); + EXPECT_EQ(topic->topic_id, topic_id); + EXPECT_STREQ(topic->topic_name, topic_name); + } + + EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id); + EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic = + (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); + EXPECT_EQ(topic->free_cb_arg, s); + EXPECT_EQ(topic->topic_id, topic_id); + EXPECT_STREQ(topic->topic_name, topic_name); + } + + EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic = + (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free); + EXPECT_EQ(topic->free_cb_arg, s); + EXPECT_EQ(topic->topic_id, topic_id); + EXPECT_STREQ(topic->topic_name, topic_name); + EXPECT_EQ(utarray_len(s->topic_array), 1); + } + + EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id + + EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1); + EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0; + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema + } + + mq_schema_free(s); +} + +void test_mock_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} +void test_mock_overwrite_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} + +TEST(mq_schema, subscribe) { + + struct mq_schema *s = mq_schema_new(); + EXPECT_TRUE(s!=NULL); + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id + + EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0); + EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe + + struct mq_topic *topic; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); + EXPECT_EQ(topic->free_cb_arg, s); + EXPECT_EQ(topic->topic_id, topic_id); + EXPECT_STREQ(topic->topic_name, topic_name); + } + + EXPECT_EQ(topic->subscriber_cnt, 2); + EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg); + EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + mq_schema_free(s); +} + +#if 0 + +void test_mock_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} +void test_mock_overwrite_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} + +TEST(plugin_manager_init, stellar_mq_subscribe) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id + EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_plugin_register(&st, NULL, NULL,&st); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + plugin_manager_exit(plug_mgr); +} + +void test_mock_session_msg_free(void *msg, void *msg_free_arg){} + +void test_mock_on_session_msg(int topic_id, const void *msg,void *plugin_env){} +void test_mock_overwrite_on_session_msg(int topic_id, const void *msg, void *plugin_env){} + + +TEST(plugin_manager_init, stellar_mq_subscribe_overwrite) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="SESSION_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_session_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id + EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_session_msg, 10),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_plugin_register(&st, NULL, NULL, NULL); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, plugin_id),0); + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_session_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_session_msg); + + plugin_manager_exit(plug_mgr); +} + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT * + **********************************************/ + +int test_plugin_on_polling_func(void *plugin_env) +{ + return 1; +} + +TEST(plugin_manager_init, polling_plugin_register) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + st.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + int plugin_id = stellar_polling_plugin_register(&st, test_plugin_on_polling_func, &st); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_TRUE(plugin_id>=0); + struct registered_polling_plugin_schema *schema = (struct registered_polling_plugin_schema *)utarray_eltptr( + plug_mgr->registered_polling_plugin_array, (unsigned int)(plugin_id)); + EXPECT_EQ(schema->on_polling, (void *)test_plugin_on_polling_func); + EXPECT_EQ(schema->plugin_env, &st); + EXPECT_EQ(utarray_len(plug_mgr->registered_polling_plugin_array), 1); + } + + plugin_manager_exit(plug_mgr); +} + +/******************************************* + * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME* + *******************************************/ + +#define PACKET_PROTO_PLUGIN_NUM 128 +#define PACKET_EXDATA_NUM 2 +#define PACKET_TOPIC_NUM 2 +#define PACKET_MQ_SUB_NUM 2 +struct packet_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int basic_on_packet_called; + int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM]; + int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM]; + int exdata_set_on_packet_called; + int exdata_get_on_packet_called; + unsigned int packet_exdata_idx[PACKET_EXDATA_NUM]; + int exdata_free_called[PACKET_EXDATA_NUM]; + unsigned int packet_topic_id[PACKET_TOPIC_NUM]; + unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM]; + int msg_pub_cnt; + int msg_sub_cnt; + int msg_free_cnt; +}; + +static void test_basic_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + env->basic_on_packet_called+=1; + return; +} + +TEST(plugin_manager, packet_plugin_illegal_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_plugin_register(&st, test_basic_on_packet, NULL,&env); + EXPECT_GE(plugin_id, 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array); + EXPECT_EQ(packet_plugin_num, 1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); +} + +static void test_proto_filter_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + //env->proto_filter_plugin_called[ip_protocol]+=1; + return; +} + +TEST(plugin_manager, DISABLED_packet_plugins_with_proto_filter) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0])); + for (int i = 0; i < proto_filter_plugin_num; i++) + { + env.proto_filter_plugin_id[i] = stellar_plugin_register(&st, test_proto_filter_on_packet, NULL,&env); + EXPECT_GE(env.proto_filter_plugin_id[i], 0); + + + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num); + } + + struct packet pkt={&st, IPv4, 0}; + + int N_packet=10; + for (int j = 0; j < N_packet; j++) + { + for (int i = 0; i < proto_filter_plugin_num; i++) + { + pkt.ip_proto = i; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + } + plugin_manager_exit(plug_mgr); + + for (int i = 0; i < proto_filter_plugin_num; i++) + { + EXPECT_EQ(env.proto_filter_plugin_called[i], N_packet); + } +} + + +struct test_exdata +{ + struct packet *pkt; + struct session *sess; + long long value; +}; + +static void test_exdata_set_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->exdata_set_on_packet_called+=1; + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; ivalue=i; + exdata_val->pkt=pkt; + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0); + } + return; +} + +static void test_exdata_get_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; ipacket_exdata_idx[i]); + EXPECT_EQ(exdata_val->value, i); + } + env->exdata_get_on_packet_called+=1; + return; +} + +static void test_packet_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + struct test_exdata *exdata_val=(struct test_exdata *)ex_ptr; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + EXPECT_EQ(exdata_val->value, idx); + + EXPECT_EQ(packet_exdata_get(exdata_val->pkt, idx), nullptr);// illegal get in exdata_free callback + EXPECT_EQ(packet_exdata_set(exdata_val->pkt, idx, exdata_val->pkt), -1);// illegal set in exdata_free callback + + FREE(ex_ptr); + env->exdata_free_called[idx]+=1; + + return; +} + + +TEST(plugin_manager, packet_plugins_share_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX]; + int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0])); + for(int i=0; iexdata_schema->exdata_meta_array, env.packet_exdata_idx[i]); + + EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, &env); + EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]); + EXPECT_STREQ(exdata_schema->name, exdata_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->exdata_schema->exdata_meta_array), exdata_idx_len); + } + + int exdata_set_plugin_id=stellar_plugin_register(&st, test_exdata_set_on_packet, NULL,&env); + EXPECT_GE(exdata_set_plugin_id, 0); + + int exdata_get_plugin_id=stellar_plugin_register(&st, test_exdata_get_on_packet, NULL,&env); + EXPECT_GE(exdata_get_plugin_id, 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + + for(int i=0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.exdata_set_on_packet_called, N_packet); + EXPECT_EQ(env.exdata_get_on_packet_called, N_packet); + + for(int i=0; i < exdata_idx_len; i++) + { + EXPECT_EQ(env.exdata_free_called[i], N_packet); + } +} + +static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + return; +} + +static void test_mq_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + env->msg_sub_cnt+=1; + return; +} + +static void test_mq_pub_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + for(int i=0; iplug_mgr->st, env->packet_topic_id[i], pkt), 0); + env->msg_pub_cnt+=1; + } + return; +} + +TEST(plugin_manager, packet_plugins_mq_pub_sub) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_plugin_register(&st, test_mq_pub_on_packet, NULL,&env); + EXPECT_GE(pub_plugin_id, 0); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL,&env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + +static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + unsigned int cnt=0; + int *msg; + for(int i=0; iplug_mgr->max_message_dispatch; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); + if(cnt < env->plug_mgr->max_message_dispatch) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(plugin_manager, packet_plugins_pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); + EXPECT_GE(pub_plugin_id, 0); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + + + +static void test_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + env->exdata_free_called[idx]+=1; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], (struct packet *)ex_ptr), -1);// publish message in packet exdata_free is illegal + env->msg_pub_cnt+=1; + return; +} + +static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal + return; +} + +static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0); + env->basic_on_packet_called+=1; + return; +} + +static void test_exdata_free_pub_msg_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_EQ(topic_id, env->packet_topic_id[0]); + env->msg_sub_cnt+=1; +} + +TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_plugin_register(&st, test_exdata_free_pub_msg_on_packet, NULL,&env); + EXPECT_GE(plugin_id, 0); + + env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); + env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, NULL, test_exdata_free_pub_msg_free, &env); + + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); + + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); + EXPECT_EQ(env.msg_pub_cnt, 1); + EXPECT_EQ(env.msg_sub_cnt, 0); + EXPECT_EQ(env.msg_free_cnt, 0); + EXPECT_EQ(env.exdata_free_called[0], 1); +} + + +/********************************************** + * PESUDO SESSION MANAGER PLUGIN API * + **********************************************/ + +struct session_manager_plugin_env +{ + struct stellar *st; + int N_session; + int N_per_session_pkt_cnt; + struct session sess[1024]; + + int session_manager_plugin_id; + + int intrinsc_tcp_input_topic_id; + int intrinsc_tcp_output_topic_id; + int intrinsc_tcp_stream_topic_id; + + int intrinsc_udp_input_topic_id; + int intrinsc_udp_output_topic_id; + + int basic_exdata_idx; + int basic_exdata_free_called; + int basic_on_tcp_called; + int basic_ctx_new_called; + int basic_ctx_free_called; + int test_mq_pub_plugin_id; + int test_mq_sub_plugin_id; + int test_mq_pub_called; + int test_mq_sub_called; + int test_mq_free_called; + int test_mq_topic_id; + int plugin_id_1; + int plugin_id_2; + int plugin_id_1_called; + int plugin_id_2_called; + int exdata_ctx_1_id; + int exdata_ctx_2_id; +}; + +typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *plugin_env); + +static void pesudo_on_msg_dispatch(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env) +{ + on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb; + struct session *sess=(struct session *)msg; + session_cb(topic_id, sess, sub_plugin_env); +} +static void pesudo_on_packet_input(struct packet *pkt, void *plugin_env) +{ + struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env; + for (int i = 0; i < env->N_session; i++) + { + stellar_mq_publish_message(env->st, env->intrinsc_tcp_input_topic_id, &env->sess[i]); + } +} + +static void pesudo_on_packet_output(struct packet *pkt, void *plugin_env) +{ + struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env; + for (int i = 0; i < env->N_session; i++) + { + stellar_mq_publish_message(env->st, env->intrinsc_tcp_output_topic_id, &env->sess[i]); + } +} + +static void pesudo_session_load(struct stellar *st, struct session_manager_plugin_env *env) +{ + env->st=st; + for(int i=0; i N_session; i++) + { + env->sess[i].session_exdat_rt=session_exdata_runtime_new(st); + env->sess[i].type=SESSION_TYPE_TCP; + } + env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, env); + + env->session_manager_plugin_id=stellar_plugin_register(st, pesudo_on_packet_input, pesudo_on_packet_output, env); +} + +void pesudo_session_unload(struct stellar *st, struct session_manager_plugin_env *env) +{ + for(int i=0; i N_session; i++) + { + session_exdata_runtime_free(env->sess[i].session_exdat_rt); + } +} + +static int pesudo_tcp_session_subscribe(struct stellar *st, on_session_msg_cb_func *on_session_cb, int plugin_id) +{ + int topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP); + if(topic_id<0) + { + topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, NULL); + } + return stellar_mq_subscribe(st, topic_id, (on_msg_cb_func *)on_session_cb, plugin_id); +} + +/********************************************** + * SESSION MANAGER PLUGIN RUNTIME * + **********************************************/ + + +TEST(plugin_manager, no_plugin_register_runtime) { + + struct stellar st={0}; + +// init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + +// prepare packet and session + + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + +// load session manager plugin + pesudo_session_load(&st, &env); + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + plugin_manager_on_packet_output(plug_mgr, &pkt); + + } + +// unload session manager plugin + pesudo_session_unload(&st, &env); + +//exit stage + plugin_manager_exit(plug_mgr); +} + + +static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + if(sess) + { + EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set + EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get + long long called = (long long )session_exdata_get(sess, env->basic_exdata_idx); + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, (void *)(called+1)), 0); + env->basic_on_tcp_called+=1; + } + return; +} + +static void test_basic_session_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg; + EXPECT_EQ(env->basic_exdata_idx, idx); + EXPECT_EQ((long long )ex_ptr, env->N_per_session_pkt_cnt); + + env->basic_exdata_free_called+=1; +} + +TEST(plugin_manager, session_plugin_on_tcp) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL,MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(env)); + env.N_per_session_pkt_cnt=10; + env.N_session=1; + + int plugin_id=stellar_plugin_register(&st, NULL, NULL, &env); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(pesudo_tcp_session_subscribe(&st, test_basic_on_tcp_session, plugin_id), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "SESSION_EXDATA", test_basic_session_exdata_free,&env); + EXPECT_GE(env.basic_exdata_idx, 0); + + struct packet pkt={&st, TCP, ip_proto}; + + pesudo_session_load(&st, &env); + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + plugin_manager_on_polling(plug_mgr); + } + + pesudo_session_unload(&st, &env); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_tcp_called, env.N_session*env.N_per_session_pkt_cnt); + EXPECT_EQ(env.basic_exdata_free_called, env.N_session); + +} + +//TODO: test case, message pub overlimit +#if 0 +static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg + FREE(ctx); + return; +} + +struct test_overlimit_msg +{ + struct session *sess; + int called; +}; + +static void test_overlimit_pub_on_session(int topic_id, const void *msg, void *plugin_env) +{ + struct session *sess=(struct session *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + struct test_overlimit_msg *pub_msg; + if (msg) + { + env->test_mq_pub_called += 1; + ctx->pkt_cnt += 1; + for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++) + { + pub_msg = CALLOC(struct test_overlimit_msg, 1); + pub_msg->called = env->test_mq_pub_called; + pub_msg->sess=sess; + if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), 0); + ctx->pub_cnt+=1; + } + else + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), -1); + FREE(pub_msg); + } + } + } + return; +} + +static void test_overlimit_on_sub_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(recv_msg->called, env->test_mq_pub_called); + env->test_mq_sub_called+=1; + ctx->sub_cnt+=1; + return; +} + +static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)msg_free_arg; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + if(recv_msg) + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free + EXPECT_EQ(env->test_mq_pub_called, recv_msg->called); + env->test_mq_free_called+=1; + FREE(msg); + } + return; +} + +TEST(plugin_manager,DISABLED_session_plugin_pub_msg_overlimt) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.test_mq_pub_plugin_id, 0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_overlimit_session_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.test_mq_sub_plugin_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + { + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session); + EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + +} +#endif + + +//TODO: test case, publish msg on session closed +#if 0 +struct test_session_closing_ctx +{ + int pkt_called; + int session_free_called; + int userdefine_on_msg_called; +}; + +static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1); + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_new_called+=1; + return ctx; +} + +static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt); + EXPECT_EQ(ctx->session_free_called,1); + FREE(ctx); +} + + +static void test_session_closing_on_session_free(struct session *sess, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + if(sess->state==SESSION_STATE_CLOSING) + { + ctx->session_free_called+=1; + stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, env); + env->test_mq_pub_called+=1; + } +} + + +static void test_session_closing_on_intrisic_msg( int topic_id, const void *msg, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg; + if(msg)ctx->pkt_called+=1; +} + +static void test_session_closing_on_userdefine_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + ctx->userdefine_on_msg_called+=1; + EXPECT_EQ(msg, plugin_env); + env->test_mq_sub_called+=1; +} + +TEST(plugin_manager, DISABLED_session_plugin_pub_msg_on_closing) { + + struct stellar st={0}; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + //EXPECT_EQ(env.basic_ctx_new_called,env.N_session); + //EXPECT_EQ(env.basic_ctx_free_called,env.N_session); + //EXPECT_EQ(env.test_mq_pub_called,env.N_session); + //EXPECT_EQ(env.test_mq_sub_called,env.N_session); +} + +#endif + + +//TODO: test case, mq priority +#if 0 + +//test dettach session +static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->plugin_id_1_called+=1; + if(topic_id == env->intrinsc_tcp_topic_id) + { + struct session *sess = (struct session *)msg; + struct test_session_called_ctx *ctx = + (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id); + if (ctx == NULL) + { + ctx = CALLOC(struct test_session_called_ctx, 1); + session_exdata_set(sess, env->exdata_ctx_1_id, ctx); + } + ctx->called+=1; + EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority + EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int )(long)msg, env->plugin_id_1); + } + } + return; +} + +static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + + env->plugin_id_2_called+=1; + + if(topic_id == env->intrinsc_tcp_topic_id) + { + struct session *sess = (struct session *)msg; + struct test_session_called_ctx *ctx = + (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id); + if (ctx == NULL) + { + ctx = CALLOC(struct test_session_called_ctx, 1); + session_exdata_set(sess, env->exdata_ctx_2_id, ctx); + } + ctx->called+=1; + EXPECT_EQ(ctx->called % 3, 1); + // publish msg has normal priority + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_1); + } + } + return; +} + +TEST(plugin_manager, test_session_mq_priority) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env); + EXPECT_GE(plugin_id_1,0); + + int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(plugin_id_2,0); + + env.plugin_id_1=plugin_id_1; + env.plugin_id_2=plugin_id_2; + + env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ; + env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ; + + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1) + EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); + EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); + +} + +#endif + +// TODO: test case, session_exdata_free_pub_msg +#if 0 + +void test_session_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->intrinsc_tcp_input_topic_id, arg), -1); + env->basic_exdata_free_called+=1; +} + +static void test_session_exdata_free_pub_msg_on_session(int topic_id, const void *msg, void *plugin_env) +{ + struct session *sess=(struct session *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0); + if(msg)env->plugin_id_1_called+=1; +} + +TEST(plugin_manager, session_exdata_free_pub_msg) { + + struct stellar st={0}; + struct session_manager_plugin_env env; + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + env.plugin_id_1=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.plugin_id_1,0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "BASIC_EXDATA", test_session_exdata_free_pub_msg_exdata_free, &env) ; + EXPECT_GE(env.basic_exdata_idx, 0); + +// pesudo packet and session + + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_exdata_free_called,env.N_session); + EXPECT_EQ(env.plugin_id_1_called,env.N_session*env.N_per_session_pkt_cnt); +} + + +#endif + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN RUNTIME * + **********************************************/ + +struct polling_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int N_polling; + int return0_polling_called; + int return1_polling_called; +}; + +int return1_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return1_polling_called+=1; + return 1; +} + +int return0_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return0_polling_called+=1; + return 0; +} + +TEST(plugin_manager, basic_polling_plugins) { + +// pesudo init stage + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + st.plug_mgr=plug_mgr; + struct polling_plugin_env env; + memset(&env, 0, sizeof(struct polling_plugin_env)); + env.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + int plugin_id = stellar_polling_plugin_register(&st, return0_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + plugin_id = stellar_polling_plugin_register(&st, return1_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + +// pesudo runtime stage + + env.plug_mgr=plug_mgr; + env.N_polling=10; + + for(int i=0; i