diff --git a/include/stellar/module_manager.h b/include/stellar/module_manager.h new file mode 100644 index 0000000..a626575 --- /dev/null +++ b/include/stellar/module_manager.h @@ -0,0 +1,40 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar/mq.h" + +struct stellar_module; +struct stellar_module *stellar_module_new(const char *name); +void stellar_module_free(struct stellar_module *mod); + +void * stellar_module_get_ctx(struct stellar_module *mod); +void stellar_module_set_ctx(struct stellar_module *mod, void *ctx); + +const char *stellar_module_get_name(struct stellar_module* mod); + +struct stellar_module_manager; + +typedef struct stellar_module *module_on_init_func(struct stellar_module_manager *mod_mgr); +typedef void module_on_exit_func(struct stellar_module_manager *mod_mgr, struct stellar_module *mod); + +struct stellar_module_manager *stellar_module_manager_new(const char *module_spec_toml_path, int max_thread_num, struct mq_schema *mq_schema); +void stellar_module_manager_free(struct stellar_module_manager *mod_mgr); + +struct stellar_module *stellar_module_manager_get_module(struct stellar_module_manager *mod_mgr, const char *module_name); + +void stellar_module_manager_register_thread(struct stellar_module_manager* mod_mgr, int thread_id, struct mq_runtime *mq_rt); + +// return -1 on error +int stellar_module_manager_get_thread_id(struct stellar_module_manager* mod_mgr); +int stellar_module_manager_get_max_thread_num(struct stellar_module_manager* mod_mgr); + +struct mq_schema *stellar_module_get_mq_schema(struct stellar_module_manager *mod_mgr); +struct mq_runtime *stellar_module_get_mq_runtime(struct stellar_module_manager *mod_mgr); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index cc66cfe..397e1a4 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -7,15 +7,12 @@ extern "C" #include -#include "stellar/mq.h" #include "stellar/log.h" #include "stellar/packet.h" struct stellar; -//return plugin_env -typedef void *plugin_on_load_func(struct stellar *st); -typedef void plugin_on_unload_func(void *plugin_env); + typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg); //return 0 if success, otherwise return -1. @@ -28,8 +25,6 @@ int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_pol void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); -int stellar_get_worker_thread_num(struct stellar *st); -uint16_t stellar_get_current_thread_index(); // only send user build packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt); @@ -40,9 +35,6 @@ void stellar_loopbreak(struct stellar *st); void stellar_reload_log_level(struct stellar *st); struct logger *stellar_get_logger(struct stellar *st); -struct mq_schema *stellar_get_mq_schema(struct stellar *st); -struct mq_runtime *stellar_get_mq_runtime(struct stellar *st); - #ifdef __cplusplus } #endif diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index 5d73a33..0ad1bb2 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,4 +1,4 @@ -set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager plugin_manager) +set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml) #set(DECODERS http lpi) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) diff --git a/infra/exdata/CMakeLists.txt b/infra/exdata/CMakeLists.txt index 2630f57..93e8c8f 100644 --- a/infra/exdata/CMakeLists.txt +++ b/infra/exdata/CMakeLists.txt @@ -1,3 +1,4 @@ add_library(exdata exdata.c) +# //TODO: Add test #add_subdirectory(test) \ No newline at end of file diff --git a/infra/plugin_manager/test/CMakeLists.txt b/infra/exdata/test/CMakeLists.txt similarity index 57% rename from infra/plugin_manager/test/CMakeLists.txt rename to infra/exdata/test/CMakeLists.txt index d16c69a..323e29c 100644 --- a/infra/plugin_manager/test/CMakeLists.txt +++ b/infra/exdata/test/CMakeLists.txt @@ -1,13 +1,14 @@ -add_executable(gtest_plugin_manager -plugin_manager_gtest_main.cpp +add_executable(gtest_module_manager + module_manager_gtest_main.cpp ) + include_directories(${CMAKE_SOURCE_DIR}/infra/plugin_manager/) include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/) target_link_libraries( - gtest_plugin_manager - plugin_manager + gtest_module_manager + module_manager dl "-rdynamic" gtest @@ -15,4 +16,4 @@ target_link_libraries( ) include(GoogleTest) -gtest_discover_tests(gtest_plugin_manager) \ No newline at end of file +gtest_discover_tests(gtest_module_manager) \ No newline at end of file diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/exdata/test/gtest_module_manager_main.cpp similarity index 97% rename from infra/plugin_manager/test/plugin_manager_gtest_main.cpp rename to infra/exdata/test/gtest_module_manager_main.cpp index ec74872..079f9e7 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/exdata/test/gtest_module_manager_main.cpp @@ -2,46 +2,8 @@ #include -#include "plugin_manager.h" -#include "plugin_manager_gtest_mock.h" -#define STELLAR_INTRINSIC_TOPIC_NUM 0 -#define TOPIC_NAME_MAX 512 - -#if 0 -void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) -{ - SCOPED_TRACE("whitebox test intrisic metadata"); - - EXPECT_TRUE(plug_mgr!=NULL); - - EXPECT_EQ(plug_mgr->st, st); - - //load spec null - EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL); - - //session exdata schema null - EXPECT_TRUE(plug_mgr->exdata_schema!=NULL); - - //stellar mq schema null - EXPECT_TRUE(plug_mgr->stellar_mq_schema_array==NULL); - - //registered plugin array null - EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); - EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); - - EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); - int thread_num=stellar_get_worker_thread_num(st); - for(int i=0; iper_thread_data[i].exdata_array==NULL); - EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); - for(int j=0; jper_thread_data[i].priority_mq[j]==NULL); - } -} - -#endif +#include "module_manager/module_manager_interna.h" /*********************************** * TEST PLUGIN MANAGER INIT & EXIT * diff --git a/infra/module_manager/CMakeLists.txt b/infra/module_manager/CMakeLists.txt new file mode 100644 index 0000000..cc5b0f6 --- /dev/null +++ b/infra/module_manager/CMakeLists.txt @@ -0,0 +1,8 @@ +add_library(module_manager module_manager.c) +target_include_directories(module_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/) +target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) +target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/) +target_link_libraries(module_manager PUBLIC toml ${CMAKE_DL_LIBS}) + +#add_subdirectory(test) \ No newline at end of file diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c new file mode 100644 index 0000000..53f906c --- /dev/null +++ b/infra/module_manager/module_manager.c @@ -0,0 +1,234 @@ +#include "module_manager_interna.h" + +#include "stellar/module_manager.h" +#include "stellar/utils.h" +#include "toml/toml.h" +#include +#include + + +UT_icd module_specs_icd = {sizeof(struct module_specific), NULL, NULL, NULL}; + +static struct module_specific *module_specs_load(const char *toml_conf_path, int *mod_num) +{ + *mod_num = 0; + FILE* fp = fopen(toml_conf_path, "r"); + if(fp==NULL)return NULL; + char errbuf[256]; + toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + if (!conf) { + fprintf(stderr, "Error parsing toml: %s\n", errbuf); + return NULL; + } + struct module_specific* mod_spec=NULL; + toml_array_t* mod_array = toml_array_in(conf, "module"); + if(mod_array==NULL)goto MODULE_SPEC_LOAD_ERROR; + *mod_num = toml_array_nelem(mod_array); + mod_spec = CALLOC(struct module_specific, *mod_num); + + for (int i = 0; i < *mod_num; i++) { + toml_table_t* toml_mod = toml_table_at(mod_array, i); + + const char *path_raw = toml_raw_in(toml_mod, "path"); + const char *init_func_name_raw = toml_raw_in(toml_mod, "init"); + const char *exit_func_name_raw = toml_raw_in(toml_mod, "exit"); + char *path = NULL; + char *init_func_name = NULL; + char *exit_func_name = NULL; + if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) || + toml_rtos(exit_func_name_raw, &exit_func_name)) + { + goto MODULE_SPEC_LOAD_ERROR; + } + void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL); + if (!handle) { + fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror()); + goto MODULE_SPEC_LOAD_ERROR; + } + + mod_spec[i].load_cb = (module_on_init_func *) dlsym(handle, init_func_name); + if (!mod_spec[i].load_cb) { + fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror()); + } + + mod_spec[i].unload_cb = (module_on_exit_func *) dlsym(handle, exit_func_name); + if (!mod_spec[i].unload_cb) { + fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror()); + } + FREE(path); + FREE(init_func_name); + FREE(exit_func_name); + } + toml_free(conf); + return mod_spec; +MODULE_SPEC_LOAD_ERROR: + toml_free(conf); + if(mod_spec)FREE(mod_spec); + *mod_num=0; + return NULL; +} + +/******************************************* + * stellar module manager API * + *******************************************/ + +struct stellar_module_manager *stellar_module_manager_new(const char *module_spec_toml_path, int max_thread_num, struct mq_schema *mq_schema) +{ + int spec_num; + struct module_specific *specs = module_specs_load(module_spec_toml_path, &spec_num); + if(spec_num < 0) + { + return NULL; + } + struct stellar_module_manager *mod_mgr = CALLOC(struct stellar_module_manager, 1); + if(spec_num > 0) + { + utarray_new(mod_mgr->schema.module_specs_array,&module_specs_icd); + utarray_reserve(mod_mgr->schema.module_specs_array, spec_num); + } + + mod_mgr->schema.max_thread_num=max_thread_num; + mod_mgr->schema.mq_schema=mq_schema; + + // TODO: store module specific data in hash + + for(int i = 0; i < spec_num; i++) + { + if (specs[i].load_cb != NULL) + { + //TODO: duplicate check mod_name + specs[i].mod=specs[i].load_cb(mod_mgr); + utarray_push_back(mod_mgr->schema.module_specs_array, &specs[i]); + } + } + FREE(specs); + return mod_mgr; +} + +void stellar_module_manager_free(struct stellar_module_manager *mod_mgr) +{ + if(mod_mgr==NULL)return; + struct module_specific *p=NULL; + if (mod_mgr->schema.module_specs_array) + { + while ((p = (struct module_specific *)utarray_next(mod_mgr->schema.module_specs_array, p))) + { + if (p->unload_cb) + p->unload_cb(mod_mgr, p->mod); + } + utarray_free(mod_mgr->schema.module_specs_array); + } +#if 0 + if(plug_mgr->stellar_mq_schema_array) + { + for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) + { + stellar_mq_destroy_topic( plug_mgr->st, i); + } + utarray_free(plug_mgr->stellar_mq_schema_array); + } + + //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); + if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); + if(plug_mgr->registered_packet_plugin_array) + { + struct registered_plugin_schema *s = NULL; + while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s))) + { + if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info); + } + utarray_free(plug_mgr->registered_packet_plugin_array); + } +#endif + + FREE(mod_mgr); + return; +} + +int stellar_module_manager_get_max_thread_num(struct stellar_module_manager*mod_mgr) +{ + if(mod_mgr==NULL)return -1; + return mod_mgr->schema.max_thread_num; +} + +struct mq_schema *stellar_module_get_mq_schema(struct stellar_module_manager *mod_mgr) +{ + if(mod_mgr==NULL)return NULL; + return mod_mgr->schema.mq_schema; +} + +__thread int local_thread_id=-1; +__thread struct mq_runtime *local_mq_rt=NULL; + +int stellar_module_manager_get_thread_id(struct stellar_module_manager* mod_mgr __unused) +{ + return local_thread_id; +} + +struct mq_runtime *stellar_module_get_mq_runtime(struct stellar_module_manager *mod_mgr __unused) +{ + return local_mq_rt; +} + +void stellar_module_manager_register_thread(struct stellar_module_manager* mod_mgr __unused, int thread_id, struct mq_runtime *mq_rt) +{ + local_thread_id=thread_id; + local_mq_rt=mq_rt; + return; +} + +struct stellar_module *stellar_module_manager_get_module(struct stellar_module_manager *mod_mgr, const char *module_name) +{ + if(mod_mgr==NULL)return NULL; + struct module_specific *p=NULL; + if (mod_mgr->schema.module_specs_array) + { + while ((p = (struct module_specific *)utarray_next(mod_mgr->schema.module_specs_array, p))) + { + if(strcmp(p->mod->name, module_name)==0) + { + return p->mod; + } + } + } + return NULL; +} + +/******************************************* + * stellar module API * + *******************************************/ + + +struct stellar_module *stellar_module_new(const char *name) +{ + struct stellar_module *mod = CALLOC(struct stellar_module, 1); + strncpy(mod->name, name, NAME_MAX); + return mod; +} + +void stellar_module_free(struct stellar_module *mod) +{ + if(mod==NULL)return; + FREE(mod); + return; +} + +void * stellar_module_get_ctx(struct stellar_module *mod) +{ + if(mod==NULL)return NULL; + return mod->module_ctx; +} + +void stellar_module_set_ctx(struct stellar_module *mod, void *ctx) +{ + if(mod==NULL)return; + mod->module_ctx=ctx; + return; +} + +const char *stellar_module_get_name(struct stellar_module* mod) +{ + if(mod==NULL)return NULL; + return mod->name; +} diff --git a/infra/module_manager/module_manager_interna.h b/infra/module_manager/module_manager_interna.h new file mode 100644 index 0000000..af045e4 --- /dev/null +++ b/infra/module_manager/module_manager_interna.h @@ -0,0 +1,41 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar/module_manager.h" + +#include "uthash/utarray.h" +#include "stellar/mq.h" + +#include + +struct stellar_module +{ + char name[NAME_MAX]; + void *module_ctx; +}; + +struct stellar_module_manager +{ + struct + { + UT_array *module_specs_array; + int max_thread_num; + struct mq_schema *mq_schema; + }schema; + +}__attribute__((aligned(sizeof(void*)))); + +struct module_specific +{ + struct stellar_module *mod; + module_on_init_func *load_cb; + module_on_exit_func *unload_cb; +}__attribute__((aligned(sizeof(void*)))); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/infra/module_manager/test/CMakeLists.txt b/infra/module_manager/test/CMakeLists.txt new file mode 100644 index 0000000..323e29c --- /dev/null +++ b/infra/module_manager/test/CMakeLists.txt @@ -0,0 +1,19 @@ +add_executable(gtest_module_manager + module_manager_gtest_main.cpp +) + + +include_directories(${CMAKE_SOURCE_DIR}/infra/plugin_manager/) +include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/) + +target_link_libraries( + gtest_module_manager + module_manager + dl + "-rdynamic" + gtest + gmock +) + +include(GoogleTest) +gtest_discover_tests(gtest_module_manager) \ No newline at end of file diff --git a/infra/module_manager/test/gtest_module_manager_main.cpp b/infra/module_manager/test/gtest_module_manager_main.cpp new file mode 100644 index 0000000..079f9e7 --- /dev/null +++ b/infra/module_manager/test/gtest_module_manager_main.cpp @@ -0,0 +1,1617 @@ +#pragma GCC diagnostic ignored "-Wunused-parameter" + +#include + + +#include "module_manager/module_manager_interna.h" + +/*********************************** + * TEST PLUGIN MANAGER INIT & EXIT * + ***********************************/ + +//TODO: test case, plugin_specs_load + +TEST(plugin_manager_init, init_with_null_toml) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + //whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + plugin_manager_exit(plug_mgr); +} + +TEST(plugin_manager_init, init_with_empty_toml) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, "/dev/null"); + //whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + plugin_manager_exit(plug_mgr); +} + +#if 0 + +/****************************************** + * TEST PLUGIN MANAGER PACKET PLUGIN INIT * + ******************************************/ + +static void test_mock_packet_exdata_free(int idx, void *ex_ptr, void *arg){} + +static void test_mock_overwrite_packet_exdata_free(int idx, void *ex_ptr, void *arg){} + +TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *exdata_name="PACKET_EXDATA"; + int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st); + EXPECT_GE(exdata_idx, 0); + int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr); + EXPECT_GE(overwrite_idx, 0); + EXPECT_EQ(overwrite_idx, exdata_idx); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct exdata_meta *exdata_schema = (struct exdata_meta *)utarray_eltptr( + plug_mgr->exdata_schema->exdata_meta_array, (unsigned int)exdata_idx); + EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, plug_mgr); + EXPECT_EQ(exdata_schema->idx, exdata_idx); + EXPECT_STREQ(exdata_schema->name, exdata_name); + + int exdata_num = utarray_len(plug_mgr->exdata_schema->exdata_meta_array); + EXPECT_EQ(exdata_num, 1); + } + + plugin_manager_exit(plug_mgr); +} + +void test_mock_packet_msg_free(void *msg, void *msg_free_arg){} +void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){} + +TEST(plugin_manager_init, stellar_mq_topic_create_and_update) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="PACKET_TOPIC"; + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name + + int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + struct stellar_mq_topic_schema *topic_schema = NULL; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), + -1); // duplicate create, return error + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); + } + + EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id + + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0; + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema + } + plugin_manager_exit(plug_mgr); +} + +void test_mock_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} +void test_mock_overwrite_on_packet_msg(int topic_id, const void *msg, void *plugin_env){} + +TEST(plugin_manager_init, stellar_mq_subscribe) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id + EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_plugin_register(&st, NULL, NULL,&st); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + plugin_manager_exit(plug_mgr); +} + + +static void test_mock_session_exdata_free(int idx, void *ex_ptr, void *arg){} +static void test_mock_overwrite_session_exdata_free(int idx, void *ex_ptr, void *arg){} + +TEST(plugin_manager_init, session_exdata_new_index_overwrite) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *exdata_name="SESSION_EXDATA"; + int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_session_exdata_free, &st); + EXPECT_GE(exdata_idx, 0); + int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_session_exdata_free, plug_mgr); + EXPECT_GE(overwrite_idx, 0); + EXPECT_EQ(overwrite_idx, exdata_idx); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct exdata_meta *exdata_schema = (struct exdata_meta *)utarray_eltptr( + plug_mgr->exdata_schema->exdata_meta_array, (unsigned int)exdata_idx); + EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_session_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, plug_mgr); + EXPECT_EQ(exdata_schema->idx, exdata_idx); + EXPECT_STREQ(exdata_schema->name, exdata_name); + + int exdata_num = utarray_len(plug_mgr->exdata_schema->exdata_meta_array); + EXPECT_EQ(exdata_num, 1); + } + plugin_manager_exit(plug_mgr); +} + +void test_mock_session_msg_free(void *msg, void *msg_free_arg){} + +void test_mock_on_session_msg(int topic_id, const void *msg,void *plugin_env){} +void test_mock_overwrite_on_session_msg(int topic_id, const void *msg, void *plugin_env){} + + +TEST(plugin_manager_init, stellar_mq_subscribe_overwrite) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="SESSION_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_session_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id + EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_session_msg, 10),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_plugin_register(&st, NULL, NULL, NULL); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, plugin_id),0); + EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_session_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_session_msg); + + plugin_manager_exit(plug_mgr); +} + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT * + **********************************************/ + +int test_plugin_on_polling_func(void *plugin_env) +{ + return 1; +} + +TEST(plugin_manager_init, polling_plugin_register) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + st.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + int plugin_id = stellar_polling_plugin_register(&st, test_plugin_on_polling_func, &st); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_TRUE(plugin_id>=0); + struct registered_polling_plugin_schema *schema = (struct registered_polling_plugin_schema *)utarray_eltptr( + plug_mgr->registered_polling_plugin_array, (unsigned int)(plugin_id)); + EXPECT_EQ(schema->on_polling, (void *)test_plugin_on_polling_func); + EXPECT_EQ(schema->plugin_env, &st); + EXPECT_EQ(utarray_len(plug_mgr->registered_polling_plugin_array), 1); + } + + plugin_manager_exit(plug_mgr); +} + +/******************************************* + * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME* + *******************************************/ + +#define PACKET_PROTO_PLUGIN_NUM 128 +#define PACKET_EXDATA_NUM 2 +#define PACKET_TOPIC_NUM 2 +#define PACKET_MQ_SUB_NUM 2 +struct packet_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int basic_on_packet_called; + int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM]; + int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM]; + int exdata_set_on_packet_called; + int exdata_get_on_packet_called; + unsigned int packet_exdata_idx[PACKET_EXDATA_NUM]; + int exdata_free_called[PACKET_EXDATA_NUM]; + unsigned int packet_topic_id[PACKET_TOPIC_NUM]; + unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM]; + int msg_pub_cnt; + int msg_sub_cnt; + int msg_free_cnt; +}; + +static void test_basic_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + env->basic_on_packet_called+=1; + return; +} + +TEST(plugin_manager, packet_plugin_illegal_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_plugin_register(&st, test_basic_on_packet, NULL,&env); + EXPECT_GE(plugin_id, 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array); + EXPECT_EQ(packet_plugin_num, 1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); +} + +static void test_proto_filter_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + //env->proto_filter_plugin_called[ip_protocol]+=1; + return; +} + +TEST(plugin_manager, DISABLED_packet_plugins_with_proto_filter) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0])); + for (int i = 0; i < proto_filter_plugin_num; i++) + { + env.proto_filter_plugin_id[i] = stellar_plugin_register(&st, test_proto_filter_on_packet, NULL,&env); + EXPECT_GE(env.proto_filter_plugin_id[i], 0); + + + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num); + } + + struct packet pkt={&st, IPv4, 0}; + + int N_packet=10; + for (int j = 0; j < N_packet; j++) + { + for (int i = 0; i < proto_filter_plugin_num; i++) + { + pkt.ip_proto = i; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + } + plugin_manager_exit(plug_mgr); + + for (int i = 0; i < proto_filter_plugin_num; i++) + { + EXPECT_EQ(env.proto_filter_plugin_called[i], N_packet); + } +} + + +struct test_exdata +{ + struct packet *pkt; + struct session *sess; + long long value; +}; + +static void test_exdata_set_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->exdata_set_on_packet_called+=1; + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; ivalue=i; + exdata_val->pkt=pkt; + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0); + } + return; +} + +static void test_exdata_get_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; ipacket_exdata_idx[i]); + EXPECT_EQ(exdata_val->value, i); + } + env->exdata_get_on_packet_called+=1; + return; +} + +static void test_packet_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + struct test_exdata *exdata_val=(struct test_exdata *)ex_ptr; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + EXPECT_EQ(exdata_val->value, idx); + + EXPECT_EQ(packet_exdata_get(exdata_val->pkt, idx), nullptr);// illegal get in exdata_free callback + EXPECT_EQ(packet_exdata_set(exdata_val->pkt, idx, exdata_val->pkt), -1);// illegal set in exdata_free callback + + FREE(ex_ptr); + env->exdata_free_called[idx]+=1; + + return; +} + + +TEST(plugin_manager, packet_plugins_share_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX]; + int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0])); + for(int i=0; iexdata_schema->exdata_meta_array, env.packet_exdata_idx[i]); + + EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, &env); + EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]); + EXPECT_STREQ(exdata_schema->name, exdata_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->exdata_schema->exdata_meta_array), exdata_idx_len); + } + + int exdata_set_plugin_id=stellar_plugin_register(&st, test_exdata_set_on_packet, NULL,&env); + EXPECT_GE(exdata_set_plugin_id, 0); + + int exdata_get_plugin_id=stellar_plugin_register(&st, test_exdata_get_on_packet, NULL,&env); + EXPECT_GE(exdata_get_plugin_id, 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + + for(int i=0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.exdata_set_on_packet_called, N_packet); + EXPECT_EQ(env.exdata_get_on_packet_called, N_packet); + + for(int i=0; i < exdata_idx_len; i++) + { + EXPECT_EQ(env.exdata_free_called[i], N_packet); + } +} + +static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + return; +} + +static void test_mq_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + env->msg_sub_cnt+=1; + return; +} + +static void test_mq_pub_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + for(int i=0; iplug_mgr->st, env->packet_topic_id[i], pkt), 0); + env->msg_pub_cnt+=1; + } + return; +} + +TEST(plugin_manager, packet_plugins_mq_pub_sub) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_plugin_register(&st, test_mq_pub_on_packet, NULL,&env); + EXPECT_GE(pub_plugin_id, 0); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL,&env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + +static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + unsigned int cnt=0; + int *msg; + for(int i=0; iplug_mgr->max_message_dispatch; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); + if(cnt < env->plug_mgr->max_message_dispatch) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(plugin_manager, packet_plugins_pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); + EXPECT_GE(pub_plugin_id, 0); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + + + +static void test_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + env->exdata_free_called[idx]+=1; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], (struct packet *)ex_ptr), -1);// publish message in packet exdata_free is illegal + env->msg_pub_cnt+=1; + return; +} + +static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal + return; +} + +static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0); + env->basic_on_packet_called+=1; + return; +} + +static void test_exdata_free_pub_msg_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_EQ(topic_id, env->packet_topic_id[0]); + env->msg_sub_cnt+=1; +} + +TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_plugin_register(&st, test_exdata_free_pub_msg_on_packet, NULL,&env); + EXPECT_GE(plugin_id, 0); + + env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); + env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, NULL, test_exdata_free_pub_msg_free, &env); + + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); + + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); + EXPECT_EQ(env.msg_pub_cnt, 1); + EXPECT_EQ(env.msg_sub_cnt, 0); + EXPECT_EQ(env.msg_free_cnt, 0); + EXPECT_EQ(env.exdata_free_called[0], 1); +} + + +/********************************************** + * PESUDO SESSION MANAGER PLUGIN API * + **********************************************/ + +struct session_manager_plugin_env +{ + struct stellar *st; + int N_session; + int N_per_session_pkt_cnt; + struct session sess[1024]; + + int session_manager_plugin_id; + + int intrinsc_tcp_input_topic_id; + int intrinsc_tcp_output_topic_id; + int intrinsc_tcp_stream_topic_id; + + int intrinsc_udp_input_topic_id; + int intrinsc_udp_output_topic_id; + + int basic_exdata_idx; + int basic_exdata_free_called; + int basic_on_tcp_called; + int basic_ctx_new_called; + int basic_ctx_free_called; + int test_mq_pub_plugin_id; + int test_mq_sub_plugin_id; + int test_mq_pub_called; + int test_mq_sub_called; + int test_mq_free_called; + int test_mq_topic_id; + int plugin_id_1; + int plugin_id_2; + int plugin_id_1_called; + int plugin_id_2_called; + int exdata_ctx_1_id; + int exdata_ctx_2_id; +}; + +typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *plugin_env); + +static void pesudo_on_msg_dispatch(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env) +{ + on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb; + struct session *sess=(struct session *)msg; + session_cb(topic_id, sess, sub_plugin_env); +} +static void pesudo_on_packet_input(struct packet *pkt, void *plugin_env) +{ + struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env; + for (int i = 0; i < env->N_session; i++) + { + stellar_mq_publish_message(env->st, env->intrinsc_tcp_input_topic_id, &env->sess[i]); + } +} + +static void pesudo_on_packet_output(struct packet *pkt, void *plugin_env) +{ + struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env; + for (int i = 0; i < env->N_session; i++) + { + stellar_mq_publish_message(env->st, env->intrinsc_tcp_output_topic_id, &env->sess[i]); + } +} + +static void pesudo_session_load(struct stellar *st, struct session_manager_plugin_env *env) +{ + env->st=st; + for(int i=0; i N_session; i++) + { + env->sess[i].session_exdat_rt=session_exdata_runtime_new(st); + env->sess[i].type=SESSION_TYPE_TCP; + } + env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, env); + + env->session_manager_plugin_id=stellar_plugin_register(st, pesudo_on_packet_input, pesudo_on_packet_output, env); +} + +void pesudo_session_unload(struct stellar *st, struct session_manager_plugin_env *env) +{ + for(int i=0; i N_session; i++) + { + session_exdata_runtime_free(env->sess[i].session_exdat_rt); + } +} + +static int pesudo_tcp_session_subscribe(struct stellar *st, on_session_msg_cb_func *on_session_cb, int plugin_id) +{ + int topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP); + if(topic_id<0) + { + topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, NULL); + } + return stellar_mq_subscribe(st, topic_id, (on_msg_cb_func *)on_session_cb, plugin_id); +} + +/********************************************** + * SESSION MANAGER PLUGIN RUNTIME * + **********************************************/ + + +TEST(plugin_manager, no_plugin_register_runtime) { + + struct stellar st={0}; + +// init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + +// prepare packet and session + + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + +// load session manager plugin + pesudo_session_load(&st, &env); + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + plugin_manager_on_packet_output(plug_mgr, &pkt); + + } + +// unload session manager plugin + pesudo_session_unload(&st, &env); + +//exit stage + plugin_manager_exit(plug_mgr); +} + + +static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + if(sess) + { + EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set + EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get + long long called = (long long )session_exdata_get(sess, env->basic_exdata_idx); + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, (void *)(called+1)), 0); + env->basic_on_tcp_called+=1; + } + return; +} + +static void test_basic_session_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg; + EXPECT_EQ(env->basic_exdata_idx, idx); + EXPECT_EQ((long long )ex_ptr, env->N_per_session_pkt_cnt); + + env->basic_exdata_free_called+=1; +} + +TEST(plugin_manager, session_plugin_on_tcp) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL,MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(env)); + env.N_per_session_pkt_cnt=10; + env.N_session=1; + + int plugin_id=stellar_plugin_register(&st, NULL, NULL, &env); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(pesudo_tcp_session_subscribe(&st, test_basic_on_tcp_session, plugin_id), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "SESSION_EXDATA", test_basic_session_exdata_free,&env); + EXPECT_GE(env.basic_exdata_idx, 0); + + struct packet pkt={&st, TCP, ip_proto}; + + pesudo_session_load(&st, &env); + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + plugin_manager_on_polling(plug_mgr); + } + + pesudo_session_unload(&st, &env); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_tcp_called, env.N_session*env.N_per_session_pkt_cnt); + EXPECT_EQ(env.basic_exdata_free_called, env.N_session); + +} + +//TODO: test case, message pub overlimit +#if 0 +static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg + FREE(ctx); + return; +} + +struct test_overlimit_msg +{ + struct session *sess; + int called; +}; + +static void test_overlimit_pub_on_session(int topic_id, const void *msg, void *plugin_env) +{ + struct session *sess=(struct session *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + struct test_overlimit_msg *pub_msg; + if (msg) + { + env->test_mq_pub_called += 1; + ctx->pkt_cnt += 1; + for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++) + { + pub_msg = CALLOC(struct test_overlimit_msg, 1); + pub_msg->called = env->test_mq_pub_called; + pub_msg->sess=sess; + if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), 0); + ctx->pub_cnt+=1; + } + else + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), -1); + FREE(pub_msg); + } + } + } + return; +} + +static void test_overlimit_on_sub_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(recv_msg->called, env->test_mq_pub_called); + env->test_mq_sub_called+=1; + ctx->sub_cnt+=1; + return; +} + +static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)msg_free_arg; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + if(recv_msg) + { + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free + EXPECT_EQ(env->test_mq_pub_called, recv_msg->called); + env->test_mq_free_called+=1; + FREE(msg); + } + return; +} + +TEST(plugin_manager,DISABLED_session_plugin_pub_msg_overlimt) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.test_mq_pub_plugin_id, 0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_overlimit_session_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.test_mq_sub_plugin_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + { + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session); + EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + +} +#endif + + +//TODO: test case, publish msg on session closed +#if 0 +struct test_session_closing_ctx +{ + int pkt_called; + int session_free_called; + int userdefine_on_msg_called; +}; + +static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1); + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_new_called+=1; + return ctx; +} + +static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt); + EXPECT_EQ(ctx->session_free_called,1); + FREE(ctx); +} + + +static void test_session_closing_on_session_free(struct session *sess, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + if(sess->state==SESSION_STATE_CLOSING) + { + ctx->session_free_called+=1; + stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, env); + env->test_mq_pub_called+=1; + } +} + + +static void test_session_closing_on_intrisic_msg( int topic_id, const void *msg, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg; + if(msg)ctx->pkt_called+=1; +} + +static void test_session_closing_on_userdefine_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + ctx->userdefine_on_msg_called+=1; + EXPECT_EQ(msg, plugin_env); + env->test_mq_sub_called+=1; +} + +TEST(plugin_manager, DISABLED_session_plugin_pub_msg_on_closing) { + + struct stellar st={0}; + struct session_manager_plugin_env env; + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + //EXPECT_EQ(env.basic_ctx_new_called,env.N_session); + //EXPECT_EQ(env.basic_ctx_free_called,env.N_session); + //EXPECT_EQ(env.test_mq_pub_called,env.N_session); + //EXPECT_EQ(env.test_mq_sub_called,env.N_session); +} + +#endif + + +//TODO: test case, mq priority +#if 0 + +//test dettach session +static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->plugin_id_1_called+=1; + if(topic_id == env->intrinsc_tcp_topic_id) + { + struct session *sess = (struct session *)msg; + struct test_session_called_ctx *ctx = + (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id); + if (ctx == NULL) + { + ctx = CALLOC(struct test_session_called_ctx, 1); + session_exdata_set(sess, env->exdata_ctx_1_id, ctx); + } + ctx->called+=1; + EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority + EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int )(long)msg, env->plugin_id_1); + } + } + return; +} + +static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + + env->plugin_id_2_called+=1; + + if(topic_id == env->intrinsc_tcp_topic_id) + { + struct session *sess = (struct session *)msg; + struct test_session_called_ctx *ctx = + (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id); + if (ctx == NULL) + { + ctx = CALLOC(struct test_session_called_ctx, 1); + session_exdata_set(sess, env->exdata_ctx_2_id, ctx); + } + ctx->called+=1; + EXPECT_EQ(ctx->called % 3, 1); + // publish msg has normal priority + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_1); + } + } + return; +} + +TEST(plugin_manager, test_session_mq_priority) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env); + EXPECT_GE(plugin_id_1,0); + + int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(plugin_id_2,0); + + env.plugin_id_1=plugin_id_1; + env.plugin_id_2=plugin_id_2; + + env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ; + env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ; + + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1) + EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); + EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); + +} + +#endif + +// TODO: test case, session_exdata_free_pub_msg +#if 0 + +void test_session_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg; + EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->intrinsc_tcp_input_topic_id, arg), -1); + env->basic_exdata_free_called+=1; +} + +static void test_session_exdata_free_pub_msg_on_session(int topic_id, const void *msg, void *plugin_env) +{ + struct session *sess=(struct session *)msg; + struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env; + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0); + if(msg)env->plugin_id_1_called+=1; +} + +TEST(plugin_manager, session_exdata_free_pub_msg) { + + struct stellar st={0}; + struct session_manager_plugin_env env; + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + env.plugin_id_1=stellar_plugin_register(&st, 0, NULL, NULL, &env); + EXPECT_GE(env.plugin_id_1,0); + + env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); + EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0); + EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "BASIC_EXDATA", test_session_exdata_free_pub_msg_exdata_free, &env) ; + EXPECT_GE(env.basic_exdata_idx, 0); + +// pesudo packet and session + + memset(&env, 0, sizeof(struct session_manager_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]); + } + + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + session_exdata_runtime_free(sess[i].session_exdat_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_exdata_free_called,env.N_session); + EXPECT_EQ(env.plugin_id_1_called,env.N_session*env.N_per_session_pkt_cnt); +} + + +#endif + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN RUNTIME * + **********************************************/ + +struct polling_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int N_polling; + int return0_polling_called; + int return1_polling_called; +}; + +int return1_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return1_polling_called+=1; + return 1; +} + +int return0_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return0_polling_called+=1; + return 0; +} + +TEST(plugin_manager, basic_polling_plugins) { + +// pesudo init stage + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + st.plug_mgr=plug_mgr; + struct polling_plugin_env env; + memset(&env, 0, sizeof(struct polling_plugin_env)); + env.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + int plugin_id = stellar_polling_plugin_register(&st, return0_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + plugin_id = stellar_polling_plugin_register(&st, return1_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + +// pesudo runtime stage + + env.plug_mgr=plug_mgr; + env.N_polling=10; + + for(int i=0; i -#include - -#include "stellar_core.h" - -#if 0 -void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment) -{ - plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment; -} - -void stellar_per_stage_message_counter_set(struct plugin_manager_schema *plug_mgr, int tid, long long increment) -{ - plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=increment; -} - -bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *plug_mgr, int tid) -{ - if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true; - return false; -} -#endif - -UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; - -static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) -{ - *spec_num = 0; - FILE* fp = fopen(toml_conf_path, "r"); - if(fp==NULL)return NULL; - char errbuf[256]; - toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf)); - fclose(fp); - if (!conf) { - fprintf(stderr, "Error parsing toml: %s\n", errbuf); - return NULL; - } - struct plugin_specific* plugins=NULL; - toml_array_t* plugin_array = toml_array_in(conf, "plugin"); - if(plugin_array==NULL)goto PLUGIN_SPEC_LOAD_ERROR; - *spec_num = toml_array_nelem(plugin_array); - plugins = CALLOC(struct plugin_specific, *spec_num); - - for (int i = 0; i < *spec_num; i++) { - toml_table_t* plugin = toml_table_at(plugin_array, i); - - const char *path_raw = toml_raw_in(plugin, "path"); - const char *init_func_name_raw = toml_raw_in(plugin, "init"); - const char *exit_func_name_raw = toml_raw_in(plugin, "exit"); - char *path = NULL; - char *init_func_name = NULL; - char *exit_func_name = NULL; - if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) || - toml_rtos(exit_func_name_raw, &exit_func_name)) - { - goto PLUGIN_SPEC_LOAD_ERROR; - } - void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL); - if (!handle) { - fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror()); - goto PLUGIN_SPEC_LOAD_ERROR; - } - - plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name); - if (!plugins[i].load_cb) { - fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror()); - } - - plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name); - if (!plugins[i].unload_cb) { - fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror()); - } - FREE(path); - FREE(init_func_name); - FREE(exit_func_name); - } - toml_free(conf); - return plugins; -PLUGIN_SPEC_LOAD_ERROR: - toml_free(conf); - if(plugins)FREE(plugins); - return NULL; -} - -#if 0 -static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) -{ - if(st == NULL)return NULL; - int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num); - return per_thread_data; -} -static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st) -{ - if(per_thread_data == NULL || st == NULL)return; - int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manager_per_thread_data *p_data; - for (int i = 0; i < thread_num; i++) - { - p_data=per_thread_data+i; - exdata_handle_free(p_data->exdata_array); - } - FREE(per_thread_data); - return; -} -#endif - -struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) -{ - int spec_num; - struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num); - if(spec_num < 0) - { - return NULL; - } - struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); - //plug_mgr->max_message_dispatch=max_msg_per_stage; - if(spec_num > 0) - { - utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); - utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num); - } - - plug_mgr->st = st; - stellar_set_plugin_manger(st, plug_mgr); - - //plug_mgr->exdata_schema=exdata_schema_new(); - - for(int i = 0; i < spec_num; i++) - { - if (specs[i].load_cb != NULL) - { - specs[i].plugin_ctx=specs[i].load_cb(st); - utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]); - } - } - FREE(specs); - //plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); - return plug_mgr; -} - -void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL)return; - struct plugin_specific *p=NULL; - if (plug_mgr->plugin_load_specs_array) - { - while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p))) - { - if (p->unload_cb) - p->unload_cb(p->plugin_ctx); - } - utarray_free(plug_mgr->plugin_load_specs_array); - } -#if 0 - if(plug_mgr->stellar_mq_schema_array) - { - for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) - { - stellar_mq_destroy_topic( plug_mgr->st, i); - } - utarray_free(plug_mgr->stellar_mq_schema_array); - } - - //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); - if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); - if(plug_mgr->registered_packet_plugin_array) - { - struct registered_plugin_schema *s = NULL; - while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s))) - { - if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info); - } - utarray_free(plug_mgr->registered_packet_plugin_array); - } -#endif - //plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); - //exdata_schema_free(plug_mgr->exdata_schema); - FREE(plug_mgr); - return; -} - -/******************************* - * STELLAR EXDATA * - *******************************/ -#if 0 -int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg) -{ - if(st==NULL || name==NULL)return -1; - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->exdata_schema==NULL)return -1; - return exdata_new_index(plug_mgr->exdata_schema, name, free_func, free_arg); -} - -/******************************* - * PACKET EXDATA * - *******************************/ -static struct exdata_runtime *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return NULL; - int tid=stellar_get_current_thread_index(); - if(plug_mgr->per_thread_data[tid].exdata_array == NULL) - { - plug_mgr->per_thread_data[tid].exdata_array = exdata_handle_new(plug_mgr->exdata_schema); - } - return plug_mgr->per_thread_data[tid].exdata_array; -} - -static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return; - struct exdata_runtime *per_thread_exdata_handle = per_thread_packet_exdata_arrary_get(plug_mgr); - return exdata_handle_reset(per_thread_exdata_handle); -} - -int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr) -{ - if(pkt == NULL)return -1; - struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); - return exdata_set(per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr); -} - -void *packet_exdata_get(struct packet *pkt, int idx) -{ - if(pkt == NULL)return NULL; - struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); - return exdata_get( per_thread_packet_exdata_arrary_get(plug_mgr), idx); -} - -/******************************* - * SESSION EXDATA * - *******************************/ - -int session_exdata_set(struct session *sess, int idx, void *ex_ptr) -{ - struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess); - if(sess_exdata == NULL)return -1; - return exdata_set(sess_exdata,idx, ex_ptr); -} - -void *session_exdata_get(struct session *sess, int idx) -{ - struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess); - if(sess_exdata == NULL)return NULL; - return exdata_get(sess_exdata, idx); -} -#endif - - -#if 0 -/******************************* - * PLUGIN MANAGER SESSION RUNTIME * - *******************************/ -struct exdata_runtime *session_exdata_runtime_new(struct stellar *st) -{ - struct plugin_manager_schema *plug_mgr=stellar_get_plugin_manager(st); - return exdata_handle_new(plug_mgr->exdata_schema); -} - -void session_exdata_runtime_free(struct exdata_runtime *exdata_h) -{ - return exdata_handle_free(exdata_h); -} - -/********************************************* - * PLUGIN MANAGER PLUGIN * - *********************************************/ -UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL}; - -int stellar_plugin_register(struct stellar *st, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->registered_packet_plugin_array == NULL) - { - utarray_new(plug_mgr->registered_packet_plugin_array, ®istered_plugin_array_icd); - } - struct registered_plugin_schema packet_plugin_schema; - memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); - packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input; - packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output; - packet_plugin_schema.plugin_env = plugin_env; - utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); - return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index -} - -static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out) -{ - if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; - struct registered_plugin_schema *p=NULL; - - //int tid=stellar_get_current_thread_index(); - //stellar_per_stage_message_counter_set(plug_mgr, tid, 0); - while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) - { - if(p->on_packet[in_out]) - { - p->on_packet[in_out](pkt, p->plugin_env); - } - } - //stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue); - return; -} - -void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt) -{ - plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_INPUT); -} - -void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt) -{ - if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; - plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); - //int tid=stellar_get_current_thread_index(); - //stellar_per_stage_message_counter_set(plug_mgr, tid, -1); - //stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, - // plug_mgr->stellar_mq_schema_array); - //per_thread_packet_exdata_arrary_clean(plug_mgr); -} - -/********************************************* - * PLUGIN MANAGER POLLING PLUGIN * - *********************************************/ -UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; - -int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->registered_polling_plugin_array == NULL) - { - utarray_new(plug_mgr->registered_polling_plugin_array, ®istered_polling_plugin_array_icd); - } - struct registered_polling_plugin_schema polling_plugin_schema; - memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema)); - polling_plugin_schema.on_polling = on_polling; - polling_plugin_schema.plugin_env = plugin_env; - utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema); - return (utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE -} - -int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL || plug_mgr->registered_polling_plugin_array == NULL)return 0; - struct registered_polling_plugin_schema *p=NULL; - int polling_state=0; - while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p))) - { - if(p->on_polling) - { - if(p->on_polling(p->plugin_env)==1) - { - polling_state=1; - } - } - } - return polling_state; -} - -#endif \ No newline at end of file diff --git a/infra/plugin_manager/plugin_manager.h b/infra/plugin_manager/plugin_manager.h deleted file mode 100644 index f2e9147..0000000 --- a/infra/plugin_manager/plugin_manager.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include "stellar/stellar.h" - -#ifdef __cplusplus -extern "C" -{ -#endif - -struct plugin_manager_schema; -struct plugin_manager_runtime; - -struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path); -void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); - -//TODO -void *plugin_manager_get_plugin_env(const char *plugin_name); - -//void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -//void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -//return polling work state, 0: idle, 1: working -//int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h deleted file mode 100644 index b918e0e..0000000 --- a/infra/plugin_manager/plugin_manager_interna.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar/stellar.h" - -#include "uthash/utarray.h" - - -/******************************* - * PLUGIN MANAGER INIT & EXIT * - *******************************/ - -struct plugin_manager_schema -{ - struct stellar *st; - UT_array *plugin_load_specs_array; -}__attribute__((aligned(sizeof(void*)))); - -struct plugin_specific -{ - char plugin_name[256]; - plugin_on_load_func *load_cb; - plugin_on_unload_func *unload_cb; - void *plugin_ctx; -}__attribute__((aligned(sizeof(void*)))); - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/plugin_manager/test/plugin_manager_gtest_mock.h b/infra/plugin_manager/test/plugin_manager_gtest_mock.h deleted file mode 100644 index 778a0b8..0000000 --- a/infra/plugin_manager/test/plugin_manager_gtest_mock.h +++ /dev/null @@ -1,106 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "plugin_manager/plugin_manager_interna.h" - -#include "stellar/session.h" -#include "tuple.h" - -//mock stellar -struct stellar -{ - struct plugin_manager_schema *plug_mgr; -}; - -enum packet_type -{ - UNKNOWN, - IPv4, - IPv6, - UDP, - TCP, - TCP_STREAM, - CONTROL, -}; - -struct packet -{ - struct stellar *st; - enum packet_type type; - unsigned char ip_proto; -}; - - -struct session -{ - struct exdata_handle *session_exdat_rt; - enum session_type type; - enum session_state state; - int sess_pkt_cnt; -}; - -struct plugin_manager_schema * stellar_get_plugin_manager(struct stellar *st) -{ - return st->plug_mgr; -} - -int stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *pm) -{ - st->plug_mgr=pm; - return 0; -} - -int stellar_get_worker_thread_num(struct stellar *st __attribute__((unused))) -{ - return 16; -} - -uint16_t stellar_get_current_thread_index() -{ - return 0; -} - -unsigned char packet_get_ip_protocol(struct packet *pkt) -{ - return pkt->ip_proto; -} - -enum session_type session_get_type(const struct session *sess) -{ - return sess->type; - -} - -void session_set_user_data(struct session *sess, void *user_data) -{ - sess->session_exdat_rt = (struct exdata_handle *)user_data; -} - -void *session_get_user_data(const struct session *sess) -{ - return sess->session_exdat_rt; -} - -void *packet_get_user_data(const struct packet *pkt) -{ - return pkt->st->plug_mgr; -} - -int packet_get_innermost_tuple6(const struct packet *pkt, struct tuple6 *tuple) -{ - tuple->ip_proto = pkt->ip_proto; - return 0; -} - -uint8_t packet_is_ctrl(const struct packet *pkt __attribute__((unused))) -{ - return 0; -} - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/stellar_core.c b/infra/stellar_core.c index f72574e..0c2a1f4 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -8,13 +8,14 @@ #include #include +#include "stellar/module_manager.h" + #include "utils.h" #include "packet_io.h" #include "log_private.h" #include "stellar_stat.h" #include "stellar_core.h" #include "packet_private.h" -#include "plugin_manager.h" #include "session_private.h" #include "session_manager.h" @@ -51,7 +52,8 @@ struct stellar_runtime struct logger *logger; struct stellar_stat *stat; struct packet_io *packet_io; - struct plugin_manager_schema *plug_mgr; + struct mq_schema *mq_schema; + struct stellar_module_manager *mod_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; }; @@ -130,7 +132,11 @@ static void *worker_thread(void *arg) struct stellar *st = thread->st; struct stellar_runtime *runtime = &st->runtime; struct packet_io *packet_io = runtime->packet_io; - struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; + struct stellar_module_manager *mod_mgr = runtime->mod_mgr; + struct mq_runtime *mq_rt = mq_runtime_new(runtime->mq_schema); + + stellar_module_manager_register_thread(mod_mgr, thread->tid, mq_rt); + struct thread_stat thr_stat = { .pkt_io = packet_io_stat(packet_io, thread->idx), .ip_reass = ip_reassembly_stat(ip_reass), @@ -145,7 +151,7 @@ static void *worker_thread(void *arg) for (int i = 0; i < RX_BURST_MAX; i++) { - packet_set_user_data(&packets[i], (void *)plug_mgr); + packet_set_user_data(&packets[i], (void *)mod_mgr); } snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx); @@ -292,6 +298,8 @@ static void *worker_thread(void *arg) stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX); stellar_stat_print(runtime->stat, &thr_stat, thread->idx); + mq_runtime_free(mq_rt); + ATOMIC_SET(&thread->is_runing, 0); CORE_LOG_FATAL("worker thread %d exit", thr_idx); @@ -448,8 +456,9 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg CORE_LOG_ERROR("unable to create stellar stat"); goto error_out; } - runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file); - if (runtime->plug_mgr == NULL) + runtime->mq_schema=mq_schema_new(); + runtime->mod_mgr = stellar_module_manager_new(plugin_cfg_file, config->pkt_io_cfg->nr_worker_thread, runtime->mq_schema); + if (runtime->mod_mgr == NULL) { CORE_LOG_ERROR("unable to create plugin manager"); goto error_out; @@ -527,7 +536,8 @@ void stellar_free(struct stellar *st) struct stellar_config *config = &st->config; packet_io_free(runtime->packet_io); - plugin_manager_exit(runtime->plug_mgr); + stellar_module_manager_free(runtime->mod_mgr); + mq_schema_free(runtime->mq_schema); stellar_stat_free(runtime->stat); session_manager_config_free(config->sess_mgr_cfg); @@ -564,20 +574,11 @@ void stellar_reload_log_level(struct stellar *st) * Stellar Utility Function ******************************************************************************/ -struct plugin_manager_schema *stellar_get_plugin_manager(const struct stellar *st) -{ - return st->runtime.plug_mgr; -} - -void stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *plug_mgr) -{ - st->runtime.plug_mgr = plug_mgr; -} // only send user build packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt) { - uint16_t thr_idx = stellar_get_current_thread_index(); + uint16_t thr_idx = stellar_module_manager_get_thread_id(st->runtime.mod_mgr); struct packet_io *packet_io = st->runtime.packet_io; struct session_manager *sess_mgr = st->runtime.threads[thr_idx].sess_mgr; session_manager_record_duplicated_packet(sess_mgr, pkt);