diff --git a/include/stellar/module.h b/include/stellar/module.h index db15bf0..f00bc65 100644 --- a/include/stellar/module.h +++ b/include/stellar/module.h @@ -6,7 +6,6 @@ extern "C" { #endif -#include "stellar/mq.h" #include "stellar/log.h" /******************************************* @@ -43,23 +42,21 @@ struct module_hooks module_on_thread_exit_func *on_thread_exit_cb; }; -struct module_manager *module_manager_new(struct module_hooks mod_specs[], size_t n_mod, int max_thread_num, const char *toml_path, struct mq_schema *mq_schema, struct logger *logger); -struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct mq_schema *mq_schema, struct logger *logger); +struct module_manager *module_manager_new(struct module_hooks mod_specs[], size_t n_mod, int max_thread_num, const char *toml_path, struct logger *logger); +struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct logger *logger); void module_manager_free(struct module_manager *mod_mgr); -void module_manager_register_thread(struct module_manager *mod_mgr, int thread_id, struct mq_runtime *mq_rt); +void module_manager_register_thread(struct module_manager *mod_mgr, int thread_id); void module_manager_unregister_thread(struct module_manager *mod_mgr, int thread_id); // return -1 on error int module_manager_get_thread_id(struct module_manager *mod_mgr); -struct mq_runtime *module_manager_get_mq_runtime(struct module_manager *mod_mgr); struct module *module_manager_get_module(struct module_manager *mod_mgr, const char *module_name); int module_manager_get_max_thread_num(struct module_manager *mod_mgr); const char *module_manager_get_toml_path(struct module_manager *mod_mgr); -struct mq_schema *module_manager_get_mq_schema(struct module_manager *mod_mgr); struct logger *module_manager_get_logger(struct module_manager *mod_mgr); /******************************************* diff --git a/include/stellar/mq.h b/include/stellar/mq.h deleted file mode 100644 index c273658..0000000 --- a/include/stellar/mq.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -struct mq_schema; -struct mq_schema *mq_schema_new(); -void mq_schema_free(struct mq_schema *s); - -typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg); -typedef void on_msg_cb_func(int topic_id, void *msg, void *on_msg_arg); -typedef void on_msg_dispatch_cb_func(int topic_id, - void *msg, - on_msg_cb_func *on_msg_cb, - void *on_msg_cb_arg, - void *dispatch_arg); - -//return topic_id -int mq_schema_create_topic(struct mq_schema *s, - const char *topic_name, - on_msg_dispatch_cb_func *on_dispatch_cb, - void *on_dispatch_arg, - mq_msg_free_cb_func *msg_free_cb, - void *msg_free_arg); - -int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name); - -int mq_schema_update_topic(struct mq_schema *s, - int topic_id, - on_msg_dispatch_cb_func *on_dispatch_cb, - void *on_dispatch_arg, - mq_msg_free_cb_func *msg_free_cb, - void *msg_free_arg); - -int mq_schema_destroy_topic(struct mq_schema *s, int topic_id); - -//return 0 if success, otherwise return -1. -int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg); - - -struct mq_runtime; -struct mq_runtime *mq_runtime_new(struct mq_schema *s); -void mq_runtime_defer(struct mq_runtime *rt); -void mq_runtime_free(struct mq_runtime *s); - -// return 0 if success, otherwise return -1 -int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg);// append message to pending queue -void mq_runtime_dispatch(struct mq_runtime *rt);// dispatch all message in pending queue, dispatched message will be append to dlq -void mq_runtime_clean(struct mq_runtime *rt); // free all message in dlq and pending queue, during this period, publish will be disabled - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index e76a214..179ca25 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,4 +1,4 @@ -set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor) +set(INFRA exdata tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf) #set(DECODERS lpi_plus) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) diff --git a/infra/module_manager/CMakeLists.txt b/infra/module_manager/CMakeLists.txt index 131cf93..dddde31 100644 --- a/infra/module_manager/CMakeLists.txt +++ b/infra/module_manager/CMakeLists.txt @@ -3,6 +3,6 @@ target_include_directories(module_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/) target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/) -target_link_libraries(module_manager PUBLIC toml mq ${CMAKE_DL_LIBS}) +target_link_libraries(module_manager PUBLIC toml ${CMAKE_DL_LIBS}) add_subdirectory(test) \ No newline at end of file diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c index 600327d..a8a3233 100644 --- a/infra/module_manager/module_manager.c +++ b/infra/module_manager/module_manager.c @@ -13,11 +13,10 @@ * module manager API * *******************************************/ -struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_t n_mod, int max_thread_num, const char *toml_path, struct mq_schema *mq_schema, struct logger *logger) +struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_t n_mod, int max_thread_num, const char *toml_path, struct logger *logger) { struct module_manager *mod_mgr = CALLOC(struct module_manager, 1); mod_mgr->config.max_thread_num=max_thread_num; - mod_mgr->config.mq_schema=mq_schema; mod_mgr->config.logger=logger; if(toml_path)mod_mgr->config.toml_path=strdup(toml_path); @@ -47,22 +46,22 @@ struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_ return mod_mgr; } -struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct mq_schema *mq_schema, struct logger *logger) +struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct logger *logger) { FILE *fp=fopen(toml_path, "r"); - if(!fp)return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger); + if(!fp)return module_manager_new(NULL, 0, max_thread_num, toml_path,logger); toml_table_t *conf = toml_parse_file(fp, NULL, 0); fclose(fp); - if(conf==NULL)return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger); + if(conf==NULL)return module_manager_new(NULL, 0, max_thread_num, toml_path, logger); toml_array_t* mod_array = toml_array_in(conf, "module"); if(mod_array==NULL) { toml_free(conf); - return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger); + return module_manager_new(NULL, 0, max_thread_num, toml_path, logger); } int mod_num = toml_array_nelem(mod_array); @@ -128,7 +127,7 @@ struct module_manager *module_manager_new_with_toml(const char *toml_path, int m } toml_free(conf); - return module_manager_new(mod_hooks, mod_num, max_thread_num, toml_path, mq_schema, logger); + return module_manager_new(mod_hooks, mod_num, max_thread_num, toml_path, logger); } @@ -164,11 +163,6 @@ int module_manager_get_max_thread_num(struct module_manager*mod_mgr) return mod_mgr->config.max_thread_num; } -struct mq_schema *module_manager_get_mq_schema(struct module_manager *mod_mgr) -{ - if(mod_mgr==NULL)return NULL; - return mod_mgr->config.mq_schema; -} struct logger *module_manager_get_logger(struct module_manager *mod_mgr) { @@ -195,11 +189,9 @@ struct mq_runtime *module_manager_get_mq_runtime(struct module_manager *mod_mgr return local_mq_rt; } -void module_manager_register_thread(struct module_manager* mod_mgr, int thread_id, struct mq_runtime *mq_rt) +void module_manager_register_thread(struct module_manager* mod_mgr, int thread_id) { local_thread_id=thread_id; - local_mq_rt=mq_rt; - for(int i=0; in_descriptor; i++) { if(mod_mgr->descriptors[i].mod == NULL)break; diff --git a/infra/module_manager/module_manager_interna.h b/infra/module_manager/module_manager_interna.h index 1b27bd7..1022da5 100644 --- a/infra/module_manager/module_manager_interna.h +++ b/infra/module_manager/module_manager_interna.h @@ -7,10 +7,7 @@ extern "C" #include "stellar/module.h" -#include "stellar/mq.h" - #include - #include struct module @@ -43,7 +40,6 @@ struct module_manager { char *toml_path; int max_thread_num; - struct mq_schema *mq_schema; struct logger *logger; }config; diff --git a/infra/module_manager/test/gtest_module_manager_main.cpp b/infra/module_manager/test/gtest_module_manager_main.cpp index 32d5f0c..7d9760b 100644 --- a/infra/module_manager/test/gtest_module_manager_main.cpp +++ b/infra/module_manager/test/gtest_module_manager_main.cpp @@ -23,7 +23,6 @@ const char *gtest_mock_spec_toml = TEST(module_manager_internal, stellar_module_manager_new_with_toml) { - struct mq_schema *mq_schema=NULL; char toml_template[] = "./stellar.toml.XXXXXX"; int fd = mkstemp(toml_template); @@ -31,16 +30,14 @@ TEST(module_manager_internal, stellar_module_manager_new_with_toml) { write(fd, gtest_mock_spec_toml, strlen(gtest_mock_spec_toml)); close(fd); - struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, mq_schema, NULL); + struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL); EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema); EXPECT_STREQ(module_manager_get_toml_path(mod_mgr), toml_template); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered - EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL); module_manager_free(mod_mgr); @@ -71,14 +68,11 @@ TEST(stellar_module, basic_new_and_free) { TEST(stellar_module_manager, new_with_null_toml) { - struct mq_schema *mq_schema=NULL; - struct module_manager *mod_mgr = module_manager_new_with_toml(NULL, 10, mq_schema, NULL); + struct module_manager *mod_mgr = module_manager_new_with_toml(NULL, 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL); EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema); - EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered module_manager_free(mod_mgr); @@ -86,41 +80,32 @@ TEST(stellar_module_manager, new_with_null_toml) { TEST(stellar_module_manager, new_with_empty_toml) { - struct mq_schema *mq_schema=NULL; - struct module_manager *mod_mgr = module_manager_new_with_toml("/dev/null", 10, mq_schema, NULL); + struct module_manager *mod_mgr = module_manager_new_with_toml("/dev/null", 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL); EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered - EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL); module_manager_free(mod_mgr); } TEST(stellar_module_manager, register_thread) { - struct mq_schema *mq_schema=(struct mq_schema*)1; - struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, mq_schema, NULL); + struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); - EXPECT_EQ((long)module_manager_get_mq_schema(mod_mgr), 1); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered - EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL); - struct mq_runtime *mq_rt = (struct mq_runtime*)2; - module_manager_register_thread(mod_mgr, 1, mq_rt); + module_manager_register_thread(mod_mgr, 1); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), 1); - EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 2); module_manager_unregister_thread(mod_mgr, 1); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1); - EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0); module_manager_free(mod_mgr); @@ -147,7 +132,6 @@ extern "C" void gtest_module_exit(struct module_manager *mod_mgr, struct module EXPECT_EQ(module_manager_get_module(mod_mgr, "gtest"), mod); EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1); - EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0); module_free(mod); } @@ -174,7 +158,6 @@ const char *gtest_module_spec_toml = TEST(module_manager, basic_module) { - struct mq_schema *mq_schema=(struct mq_schema *)1; char toml_template[] = "./stellar.toml.XXXXXX"; int fd = mkstemp(toml_template); @@ -182,24 +165,20 @@ TEST(module_manager, basic_module) { write(fd, gtest_module_spec_toml, strlen(gtest_module_spec_toml)); close(fd); - struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, mq_schema, NULL); + struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); EXPECT_TRUE(module_manager_get_module(mod_mgr, "gtest")!=NULL); EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ((long)module_manager_get_mq_schema(mod_mgr), 1); EXPECT_STREQ(module_manager_get_toml_path(mod_mgr), toml_template); - struct mq_runtime *mq_rt = (struct mq_runtime*)2; - module_manager_register_thread(mod_mgr, 1, mq_rt); + module_manager_register_thread(mod_mgr, 1); EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), 1); - EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 2); module_manager_unregister_thread(mod_mgr, 1); EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), -1); - EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0); module_manager_free(mod_mgr); unlink(toml_template); @@ -230,25 +209,20 @@ struct test_module_polling_env TEST(module_manager, basic_polling_module) { - struct mq_schema *mq_schema=mq_schema_new(); - - struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, mq_schema, NULL); + struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, NULL); EXPECT_TRUE(mod_mgr!=NULL); EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema); struct test_module_polling_env env={}; env.N_round=10; module_manager_register_polling_node(mod_mgr, test_module_on_polling, &env); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - module_manager_register_thread(mod_mgr, 1, mq_rt); + module_manager_register_thread(mod_mgr, 1); EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), 1); - EXPECT_EQ(module_manager_get_mq_runtime(mod_mgr), mq_rt); for(int i=0; itopic_name = src->topic_name ? strdup(src->topic_name) : NULL; -} - -static void mq_topic_schema_dtor(void *_elt) -{ - struct mq_topic *elt = (struct mq_topic *)_elt; - if (elt->topic_name) - FREE(elt->topic_name); - // FREE(elt); // free the item -} - -UT_icd mq_topic_schema_icd = {sizeof(struct mq_topic), NULL, mq_topic_schema_copy, mq_topic_schema_dtor}; - -int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name) -{ - if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1; - unsigned int len = utarray_len(s->topic_array); - struct mq_topic *topic; - for(unsigned int i = 0; i < len; i++) - { - topic = (struct mq_topic *)utarray_eltptr(s->topic_array, i); - if(strcmp(topic->topic_name, topic_name) == 0) - { - return i; - } - } - return -1; -} - -static struct mq_topic *mq_schema_get_topic(struct mq_schema *s, int topic_id) -{ - if(s==NULL || s->topic_array == NULL || topic_id < 0)return NULL; - unsigned int len = utarray_len(s->topic_array); - if (len <= (unsigned int)topic_id)return NULL; - return (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); -} - -int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct mq_topic *topic = mq_schema_get_topic(s, topic_id); - if(topic == NULL)return -1; - topic->dispatch_cb=on_dispatch_cb; - topic->dispatch_cb_arg=on_dispatch_arg; - topic->free_cb=msg_free_cb; - topic->free_cb_arg=msg_free_arg; - return 0; -} - -int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - if(s==NULL)return -1; - if(s->topic_array == NULL) - { - utarray_new(s->topic_array, &mq_topic_schema_icd); - } - unsigned int len = utarray_len(s->topic_array); - if(mq_schema_get_topic_id(s, topic_name) >= 0) - { - return -1; - } - struct mq_topic topic={}; - topic.dispatch_cb=on_dispatch_cb; - topic.free_cb=msg_free_cb; - topic.topic_name=(char *)topic_name; - topic.topic_id=len;//topid_id equals arrary index - topic.dispatch_cb_arg=on_dispatch_arg; - topic.free_cb_arg=msg_free_arg; - topic.subscribers=NULL; - topic.subscriber_cnt=0; - utarray_push_back(s->topic_array, &topic); - s->mq_topic_num+=1; - return topic.topic_id; -} - -int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) -{ - struct mq_topic *topic = mq_schema_get_topic(s, topic_id); - if(topic == NULL)return -1; - if (topic->is_destroyed == 1)return 0; - - struct mq_subscriber *sub_elt, *sub_tmp; - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - DL_DELETE(topic->subscribers, sub_elt); - FREE(sub_elt); - } - topic->is_destroyed = 1; - s->mq_topic_num-=1; - return 1; // success -} - - -static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt) -{ - struct mq_subscriber *sub_elt, *sub_tmp; - if(topic==NULL || mq_elt==NULL)return -1; - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - if (sub_elt->msg_cb) - { - if (topic->dispatch_cb) - topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg, - topic->dispatch_cb_arg); - else - sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg); - } - } - return 0; -} - - - -int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg) -{ - if(rt==NULL || rt->schema == NULL)return -1; - - struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); - if(topic==NULL)return -1; - - struct mq_message mq_elt; - mq_elt.rt=rt; - mq_elt.header.topic_id = topic_id; - mq_elt.body = msg; - mq_dispatch_one_message(topic, &mq_elt); - if (topic->free_cb) - { - topic->free_cb(mq_elt.body, topic->free_cb_arg); - } - return 0; -} - -void mq_runtime_clean(struct mq_runtime *rt) -{ - if(rt==NULL)return; - - struct mq_message *mq_elt, *tmp; - struct mq_topic *topic; - rt->is_cleaning=true; - - for (int i = 0; i < MQ_MAX; i++) - { - DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp) - { - topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); - if (topic && topic->free_cb) - { - topic->free_cb(mq_elt->body, topic->free_cb_arg); - } - DL_DELETE(rt->mq[i], mq_elt); - rt->mq_len[i] -= 1; - FREE(mq_elt); - } - } - rt->is_cleaning=false; -} - -void mq_runtime_dispatch(struct mq_runtime *rt) -{ - struct mq_topic *topic=NULL; - struct mq_message *mq_elt=NULL, *mq_tmp=NULL; - while (rt->mq_len[MQ_MAILBOX]) - { - DL_FOREACH_SAFE(rt->mq[MQ_MAILBOX], mq_elt, mq_tmp) - { - DL_DELETE(rt->mq[MQ_MAILBOX], mq_elt); - rt->mq_len[MQ_MAILBOX] -= 1; - topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); - mq_dispatch_one_message(topic, mq_elt); - if (rt->defer_enabled==true) - { - DL_APPEND(rt->mq[MQ_DEATH_LETTER], mq_elt); // move to dlq list - rt->mq_len[MQ_DEATH_LETTER] += 1; - } - else - { - if(topic->free_cb)topic->free_cb(mq_elt->body, topic->free_cb_arg); - FREE(mq_elt); - } - } - } - mq_runtime_clean(rt); - return; -} - -//return 0 if success, otherwise return -1. -int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg) -{ - struct mq_topic *topic = mq_schema_get_topic(s, topic_id); - if(topic==NULL)return -1; - - struct mq_subscriber *new_subscriber = CALLOC(struct mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->msg_cb = on_msg_cb; - new_subscriber->msg_cb_arg = on_msg_cb_arg; - DL_APPEND(topic->subscribers, new_subscriber); - - topic->subscriber_cnt+=1; - s->mq_topic_subscriber_num+=1; - return 0; -} - -int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) -{ - if(rt==NULL || rt->schema == NULL)return -1; - if(rt->is_cleaning==true)return -1; - - struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); - if(topic==NULL)return -1; - - struct mq_message *msg= CALLOC(struct mq_message,1); - msg->rt=rt; - msg->header.topic_id = topic_id; - msg->body = data; - DL_APPEND(rt->mq[MQ_MAILBOX], msg); - rt->mq_len[MQ_MAILBOX]+=1; - - if(rt->defer_enabled==false) - { - mq_runtime_dispatch(rt); - } - - return 0; -} - -struct mq_schema *mq_schema_new() -{ - struct mq_schema *s = CALLOC(struct mq_schema,1); - return s; -} - -void mq_schema_free(struct mq_schema *s) -{ - if(s==NULL)return; - if(s->topic_array) - { - for (unsigned int i = 0; i < utarray_len(s->topic_array); i++) - { - mq_schema_destroy_topic(s, i); - } - utarray_free(s->topic_array); - } - FREE(s); - return; -} - -struct mq_runtime *mq_runtime_new(struct mq_schema *s) -{ - if(s==NULL)return NULL; - struct mq_runtime *rt = CALLOC(struct mq_runtime,1); - rt->schema=s; - rt->is_cleaning=false; - return rt; -} - -void mq_runtime_defer(struct mq_runtime *rt) -{ - if(rt==NULL)return; - rt->defer_enabled=true; -} - -void mq_runtime_free(struct mq_runtime *rt) -{ - if(rt==NULL)return; - mq_runtime_clean(rt); - FREE(rt); -} - diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h deleted file mode 100644 index d0cb695..0000000 --- a/infra/mq/mq_internal.h +++ /dev/null @@ -1,75 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar/mq.h" -#include "uthash/utarray.h" - -#include - -struct mq_message -{ - struct mq_runtime *rt; - struct - { - int topic_id; - } header; - void *body; - struct mq_message *next, *prev; -} __attribute__((aligned(sizeof(void *)))); - -typedef struct mq_subscriber -{ - int topic_subscriber_idx; - int plugin_idx; - on_msg_cb_func *msg_cb; - void *msg_cb_arg; - struct mq_subscriber *next, *prev; -}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); - - -struct mq_topic -{ - char *topic_name; - int topic_id; - int subscriber_cnt; - int is_destroyed; - on_msg_dispatch_cb_func *dispatch_cb; - void *dispatch_cb_arg; - mq_msg_free_cb_func *free_cb; - void *free_cb_arg; - struct mq_subscriber *subscribers; -}__attribute__((aligned(sizeof(void*)))); - -struct mq_schema -{ - UT_array *topic_array; - int mq_topic_num; - int mq_topic_subscriber_num; -}; - - -enum mq_property -{ - MQ_MAILBOX = 0, - MQ_DEATH_LETTER = 1, - MQ_MAX, -}; - -struct mq_runtime -{ - struct mq_schema *schema; - struct mq_message *mq[MQ_MAX];// message queue - size_t mq_len[MQ_MAX]; - bool is_cleaning; - bool defer_enabled; -}; - -int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg); - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/mq/test/CMakeLists.txt b/infra/mq/test/CMakeLists.txt deleted file mode 100644 index 6b72d4e..0000000 --- a/infra/mq/test/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ -add_executable(gtest_mq gtest_mq_main.cpp) - -target_include_directories(gtest_mq PRIVATE ${CMAKE_SOURCE_DIR}/infra/) - -target_link_libraries( - gtest_mq - mq - "-rdynamic" - gtest - gmock -) - -include(GoogleTest) -gtest_discover_tests(gtest_mq) \ No newline at end of file diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp deleted file mode 100644 index 6c8d5c8..0000000 --- a/infra/mq/test/gtest_mq_main.cpp +++ /dev/null @@ -1,917 +0,0 @@ -#pragma GCC diagnostic ignored "-Wunused-parameter" - -#include - -#include "mq/mq_internal.h" - -#include "stellar/utils.h" - -#define TOPIC_NAME_MAX 512 - -/******************************************* - * TEST MQ SCHEMA * - *******************************************/ - -TEST(mq_schema, new_and_free) { - - struct mq_schema *s = mq_schema_new(); - EXPECT_TRUE(s!=NULL); - mq_schema_free(s); -} - -void mock_msg_free(void *msg, void *msg_free_arg){} -void mock_overwrite_msg_free(void *msg, void *msg_free_arg){} - -TEST(mq_schema, mq_topic_create_and_update) { - - struct mq_schema *s = mq_schema_new(); - EXPECT_TRUE(s!=NULL); - - const char *topic_name="PACKET_TOPIC"; - EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name - - int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); - EXPECT_GE(topic_id, 0); - - struct mq_topic *topic = NULL; - { - SCOPED_TRACE("White-box test, check mq_schema internal "); - topic = - (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); - EXPECT_EQ(topic->free_cb_arg, s); - EXPECT_EQ(topic->topic_id, topic_id); - EXPECT_STREQ(topic->topic_name, topic_name); - } - - EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id); - EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - topic = - (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); - EXPECT_EQ(topic->free_cb_arg, s); - EXPECT_EQ(topic->topic_id, topic_id); - EXPECT_STREQ(topic->topic_name, topic_name); - } - - EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0); - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - topic = - (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free); - EXPECT_EQ(topic->free_cb_arg, s); - EXPECT_EQ(topic->topic_id, topic_id); - EXPECT_STREQ(topic->topic_name, topic_name); - EXPECT_EQ(utarray_len(s->topic_array), 1); - } - - EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id - - EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1); - EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0; - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema - } - - mq_schema_free(s); -} - -void test_mock_on_packet_msg(int topic_id, void *msg, void *sub_arg){} -void test_mock_overwrite_on_packet_msg(int topic_id, void *msg, void *sub_arg){} - -TEST(mq_schema, subscribe) { - - struct mq_schema *s = mq_schema_new(); - EXPECT_TRUE(s!=NULL); - - const char *topic_name="PACKET_TOPIC"; - - int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); - EXPECT_GE(topic_id, 0); - - EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id - - EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0); - EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe - - struct mq_topic *topic; - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); - EXPECT_EQ(topic->free_cb_arg, s); - EXPECT_EQ(topic->topic_id, topic_id); - EXPECT_STREQ(topic->topic_name, topic_name); - } - - EXPECT_EQ(topic->subscriber_cnt, 2); - EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg); - EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg); - - mq_schema_free(s); -} - -/******************************************* - * TEST MQ RUNTIME * - *******************************************/ - -TEST(mq_runtime, new_and_free) { - - struct mq_schema *s = mq_schema_new(); - EXPECT_TRUE(s!=NULL); - - struct mq_runtime *rt = mq_runtime_new(s); - EXPECT_TRUE(rt!=NULL); - - mq_runtime_free(rt); - mq_schema_free(s); - -} - -struct test_pub_and_clean_env -{ - struct mq_schema *s; - struct mq_runtime *rt; - int N_round; - int current_round; - int topic_id; - int on_msg_free_called; - int on_msg_called; -}; - -void test_pub_and_clean_free(void *msg, void *msg_free_arg) -{ - struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)msg_free_arg; - env->on_msg_free_called+=1; - FREE(msg); - return; -} - -void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg) -{ - struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)sub_arg; - env->on_msg_called+=1; - return; -} - -TEST(mq_runtime, defer_pub_then_clean) { - - struct test_pub_and_clean_env env={}; - env.s = mq_schema_new(); - EXPECT_TRUE(env.s!=NULL); - - env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_and_clean_free , &env); - EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_and_clean_on_msg, &env), 0); - - env.N_round=10; - env.rt=mq_runtime_new(env.s); - EXPECT_TRUE(env.rt!=NULL); - - mq_runtime_defer(env.rt); - - for(int i=0; imsg_free_cnt+=1; - return; -} - -static void test_mq_on_packet_topic_msg(int topic_id, void *msg, void *sub_arg) -{ - struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg; - EXPECT_TRUE(env!=NULL); - env->msg_sub_cnt+=1; - return; -} - -static void test_mq_on_packet_in_out(int topic_id, void *msg, void *sub_arg) -{ - struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg; - EXPECT_TRUE(env!=NULL); - int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); - for(int i=0; irt, env->packet_topic_id[i], msg), 0); - env->msg_pub_cnt+=1; - } - return; -} - -TEST(mq_runtime, basic_pub_sub) { - - struct mq_schema *s = mq_schema_new(); - EXPECT_TRUE(s!=NULL); - - struct mock_packet_mq_env env; - memset(&env, 0, sizeof(struct mock_packet_mq_env)); - env.s=s; - char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; - - int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); - - - - for(int i=0; itopic_array, env.packet_topic_id[i]); - EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); - EXPECT_EQ(topic->free_cb_arg, &env); - EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); - EXPECT_STREQ(topic->topic_name, topic_name[i]); - } - } - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(s->topic_array), topic_id_num); - } - - int topic_sub_num=(int)(sizeof(env.packet_mq_sub_module_id) / sizeof(env.packet_mq_sub_module_id[0])); - - for (int i = 0; i < topic_sub_num; i++) - { - for(int j = 0; j < topic_id_num; j++) - { - EXPECT_EQ(mq_schema_subscribe(s, env.packet_topic_id[j], test_mq_on_packet_topic_msg, &env), 0); - } - } - - - int packet_in_topic_id=mq_schema_create_topic(s, "PACKET_IN", NULL, NULL, NULL, &env); - int packet_out_topic_id=mq_schema_create_topic(s, "PACKET_OUT", NULL, NULL, NULL, &env); - - mq_schema_subscribe(s, packet_in_topic_id, test_mq_on_packet_in_out, &env); - mq_schema_subscribe(s, packet_out_topic_id, test_mq_on_packet_in_out, &env); - struct mock_packet_message pkt={6, packet_in_topic_id, packet_out_topic_id}; - - struct mq_runtime *rt = mq_runtime_new(s); - EXPECT_TRUE(rt!=NULL); - env.rt=rt; - - int N_packet=1; - for (int i = 0; i < N_packet; i++) - { - mq_runtime_publish_message(rt, packet_in_topic_id, &pkt); - mq_runtime_dispatch(rt); - mq_runtime_publish_message(rt, packet_out_topic_id, &pkt); - mq_runtime_dispatch(rt); - } - - mq_runtime_free(rt); - mq_schema_free(s); - - EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt); - EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); - EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); - - -} - -struct test_pub_on_free_env -{ - mq_schema *s; - mq_runtime *rt; - int N_round; - int current_round; - int topic_id; - int on_msg_free_called; - int on_msg_called; -}; - - -static void test_pub_on_msg_free(void *msg, void *msg_free_arg) -{ - struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)msg_free_arg; - env->on_msg_free_called+=1; - if((long)msg!=env->N_round && (int)(long)msg==env->N_round-1) - { - EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void *)(long)(env->N_round)), -1);//on message free, publish always failed - EXPECT_EQ(mq_runtime_publish_message_immediate(env->rt, env->topic_id, (void *)(long)(env->N_round)), 0);//on message free, publish at once success - } - return; -} - -static void test_pub_on_free_on_msg(int topic_id, void *msg, void *sub_arg) -{ - struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)sub_arg; - env->on_msg_called+=1; - if((int)(long)msg==env->N_round) - { - EXPECT_EQ(env->on_msg_called, env->N_round+1); - EXPECT_EQ(env->current_round, env->N_round-1); - } - return; -} - -TEST(mq_runtime, pub_on_msg_free) -{ - struct test_pub_on_free_env env={}; - env.s=mq_schema_new(); - EXPECT_TRUE(env.s!=NULL); - env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_on_msg_free , &env); - EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_on_free_on_msg, &env), 0); - - env.N_round=10; - env.rt=mq_runtime_new(env.s); - EXPECT_TRUE(env.rt!=NULL); - mq_runtime_defer(env.rt); - for(int i=0; ion_msg_free_called+=1; - return; -} - -static void test_dispatch_on_msg(int topic_id, void *msg, void *sub_arg) -{ - struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)sub_arg; - env->on_msg_called+=1; - if(env->current_round==(long)msg) - { - EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void*)(long)env->N_round), 0); - } - else - { - EXPECT_EQ((long)msg, env->N_round); - } - mq_runtime_dispatch(env->rt); - return; -} - -TEST(mq_runtime, call_dispatch_when_dispatch) -{ - struct test_dispatch_on_msg_env env={}; - env.s=mq_schema_new(); - EXPECT_TRUE(env.s!=NULL); - env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_dispatch_on_msg_free , &env); - EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_dispatch_on_msg, &env), 0); - - env.N_round=10; - env.rt=mq_runtime_new(env.s); - EXPECT_TRUE(env.rt!=NULL); - for(int i=0; isess_dispatch_called+=1; -} - -static void mock_tcp_session_msg_free(void *msg, void *msg_free_arg) -{ - struct mock_session_mq_env *env=(struct mock_session_mq_env *)msg_free_arg; - env->sess_msg_free_called+=1; - -} -static int mock_tcp_session_subscribe(struct mock_session_mq_env *env, mock_on_session_msg_cb_func *on_session_cb) -{ - int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP); - if(topic_id<0) - { - topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, mock_on_msg_dispatch, NULL, mock_tcp_session_msg_free, env); - } - return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env); -} - -static void test_basic_on_tcp_session(int topic_id, struct mock_session_message *sess, void *sub_arg) -{ - struct mock_session_mq_env *env = (struct mock_session_mq_env *)sub_arg; - EXPECT_TRUE(env!=NULL); - if(sess) - { - env->basic_on_tcp_called+=1; - } - return; -} - -TEST(mq_runtime, sub_with_dispatch_cb) { - - struct mock_session_mq_env env; - memset(&env, 0, sizeof(env)); - env.N_session=10; - - env.s=mq_schema_new(); - - EXPECT_EQ(mock_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0); - env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP); - - env.rt=mq_runtime_new(env.s); - - for(int i=0; i plugin_id_1_called+=1; - if(topic_id == env->tcp_topic_id) - { - EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority - if((long)msg%2==0) - { - EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0); - } - else - { - EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0); - } - } - if(topic_id == env->test_mq_topic_id) - { - if (env->current_round % 2 == 0) - { - if (env->plugin_id_1_called % 3 == 2) - { - EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority - } - if (env->plugin_id_1_called % 3 == 0) - { - EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority - } - } - else - { - if (env->plugin_id_1_called % 3 == 2) - { - EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority - } - if (env->plugin_id_1_called % 3 == 0) - { - EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority - } - } - } - return; -} - -static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env) -{ - struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env; - - env->plugin_id_2_called+=1; - if(topic_id == env->tcp_topic_id) - { - EXPECT_EQ(env->plugin_id_2_called % 3, 1); - // tcp msg has normal priority - EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0); - } - if(topic_id == env->test_mq_topic_id) - { - if (env->current_round % 2 == 0) - { - if (env->plugin_id_2_called % 3 == 2) - { - EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority - } - if (env->plugin_id_2_called % 3 == 0) - { - EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority - } - } - else - { - if (env->plugin_id_2_called % 3 == 2) - { - EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority - } - if (env->plugin_id_2_called % 3 == 0) - { - EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority - } - } - } - return; -} - -TEST(mq_runtime, basic_mq_priority) { - - struct test_priority_mq_env env={}; - - env.s=mq_schema_new(); - - env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env); - EXPECT_GE(env.tcp_topic_id, 0); - - EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); - EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); - - env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env); - EXPECT_GE(env.test_mq_topic_id, 0); - EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); - EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); - -// mock packet and session - - env.rt=mq_runtime_new(env.s); - env.N_round=10; - - for (int i = 0; i < env.N_round; i++) - { - env.current_round=i; - mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i); - mq_runtime_dispatch(env.rt); - - } - - mq_runtime_free(env.rt); - mq_schema_free(env.s); - - // publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2 - EXPECT_EQ(env.plugin_id_1_called,env.N_round*3); - EXPECT_EQ(env.plugin_id_2_called,env.N_round*3); -} -#endif - -struct test_polling_module -{ - int mod_id; - int called; -}; - -struct test_mock_polling_env -{ - struct mq_schema *s; - struct mq_runtime *rt; - int N_round; - int current_round; - int polling_topic_id; - int polling_dispatch_called; - int mod_num; - struct test_polling_module mod[1024]; -}; - -#define TOPIC_POLLING "POLLING" - -typedef void mock_on_polling_cb_func(void *polling_arg); - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wcast-function-type" - -static void mock_on_polling_dispatch(int topic_id, - void *msg, - on_msg_cb_func* on_msg_cb, - void *on_msg_cb_arg, - void *dispatch_arg) -{ - mock_on_polling_cb_func *polling = (mock_on_polling_cb_func *)on_msg_cb; - polling(on_msg_cb_arg); - struct test_mock_polling_env *env=(struct test_mock_polling_env *)dispatch_arg; - env->polling_dispatch_called+=1; -} - -static int mock_polling_subscribe(struct test_mock_polling_env *env, mock_on_polling_cb_func *on_polling, void *polling_arg) -{ - int topic_id=mq_schema_get_topic_id(env->s, TOPIC_POLLING); - if(topic_id<0) - { - topic_id=mq_schema_create_topic(env->s, TOPIC_POLLING, mock_on_polling_dispatch, env, NULL, NULL); - } - return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_polling, polling_arg); -} - -#pragma GCC diagnostic pop - -static int mock_polling_work(struct test_mock_polling_env *env) -{ - mq_runtime_publish_message(env->rt, env->polling_topic_id, NULL); - return 0; -} - - - -static void mock_on_polling(void *polling_arg) -{ - struct test_polling_module *mod = (struct test_polling_module *)polling_arg; - mod->called+=1; - if(mod->mod_id==0 && mod->called==2) - { - struct test_mock_polling_env *env=container_of((const test_polling_module (*)[1024])polling_arg, struct test_mock_polling_env, mod); - mock_polling_work(env); - } - return; -} - -TEST(mq_runtime, polling) -{ - struct test_mock_polling_env env; - memset(&env, 0, sizeof(env)); - env.s=mq_schema_new(); - env.mod_num=10; - - for(int i=0; i < env.mod_num;i++) - { - env.mod[i].mod_id=i; - EXPECT_EQ(mock_polling_subscribe(&env, mock_on_polling, &env.mod[i]), 0); - } - - env.polling_topic_id=mq_schema_get_topic_id(env.s, TOPIC_POLLING); - env.rt=mq_runtime_new(env.s); - - env.N_round=10; - for(int i=0; i msg_free_cnt+=1; - FREE(msg); - return; -} - -static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) -{ - struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; - EXPECT_TRUE(env!=NULL); - env->msg_sub_cnt+=1; - return; -} - -static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) -{ - struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; - EXPECT_TRUE(env!=NULL); - //EXPECT_EQ(pkt->ip_proto, ip_protocol); - int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); - unsigned int cnt=0; - int *msg; - for(int i=0; iplug_mgr->max_message_dispatch; j++) - { - msg=CALLOC(int, 1); - *msg=cnt; - int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); - if(cnt < env->plug_mgr->max_message_dispatch) - { - ASSERT_EQ(pub_ret, 0); - env->msg_pub_cnt+=1; - } - else - { - ASSERT_EQ(pub_ret, -1); - } - if(pub_ret!=0)FREE(msg); - cnt+=1; - } - } - return; -} - -TEST(mq_runtime, pub_overlimit) { - - struct stellar st={0}; - struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); - whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); - - unsigned char ip_proto=6; - struct packet_plugin_env env; - memset(&env, 0, sizeof(struct packet_plugin_env)); - env.plug_mgr=plug_mgr; - char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; - - int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); - - for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); - EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); - EXPECT_EQ(topic->free_cb_arg, &env); - EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); - EXPECT_STREQ(topic->topic_name, topic_name[i]); - } - } - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); - } - - int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); - EXPECT_GE(pub_plugin_id, 0); - - int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); - - for (int i = 0; i < topic_sub_num; i++) - { - env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok - EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); - for(int j = 0; j < topic_id_num; j++) - { - EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); - } - } - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); - } - - struct packet pkt={&st, IPv4, ip_proto}; - - int N_packet=10; - for (int i = 0; i < N_packet; i++) - { - plugin_manager_on_packet_input(plug_mgr, &pkt); - plugin_manager_on_packet_output(plug_mgr, &pkt); - } - - plugin_manager_exit(plug_mgr); - EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); - EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); - EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); -} - - -#endif - - -/********************************************** - * GTEST MAIN * - **********************************************/ - -int main(int argc, char ** argv) -{ - int ret=0; - ::testing::InitGoogleTest(&argc, argv); - ret=RUN_ALL_TESTS(); - return ret; -} \ No newline at end of file diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt index 05dc5ef..592a31d 100644 --- a/infra/packet_manager/CMakeLists.txt +++ b/infra/packet_manager/CMakeLists.txt @@ -12,6 +12,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra) -target_link_libraries(packet_manager tuple logger dablooms mq exdata module_manager fieldstat4) +target_link_libraries(packet_manager tuple logger dablooms exdata module_manager fieldstat4) add_subdirectory(test) \ No newline at end of file diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt index ae68fdc..044ca6d 100644 --- a/infra/session_manager/CMakeLists.txt +++ b/infra/session_manager/CMakeLists.txt @@ -14,6 +14,6 @@ add_library(session_manager target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata fieldstat4 monitor) +target_link_libraries(session_manager timeout packet_manager tcp_reassembly exdata fieldstat4 monitor) add_subdirectory(test) \ No newline at end of file diff --git a/infra/stellar_core.c b/infra/stellar_core.c index 52d9501..3a18cf2 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -35,7 +35,6 @@ struct stellar uint64_t need_exit; struct logger *logger; struct packet_io *pkt_io; - struct mq_schema *mq_schema; struct module_manager *mod_mgr; struct thread threads[MAX_THREAD_NUM]; }; @@ -51,7 +50,6 @@ static void *worker_thread(void *arg) struct stellar *st = thread->st; struct packet_io *pkt_io = st->pkt_io; struct module_manager *mod_mgr = st->mod_mgr; - struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod); @@ -59,7 +57,7 @@ static void *worker_thread(void *arg) prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); __thread_local_logger = st->logger; - module_manager_register_thread(mod_mgr, thread_id, mq_rt); + module_manager_register_thread(mod_mgr, thread_id); ATOMIC_SET(&thread->is_runing, 1); CORE_LOG_FATAL("worker thread %d runing", thread_id); @@ -90,7 +88,6 @@ static void *worker_thread(void *arg) CORE_LOG_FATAL("worker thread %d cleaning", thread_id); module_manager_unregister_thread(mod_mgr, thread_id); - mq_runtime_free(mq_rt); ATOMIC_SET(&thread->is_runing, 0); CORE_LOG_FATAL("worker thread %d exit", thread_id); @@ -218,14 +215,7 @@ struct stellar *stellar_new(const char *toml_file) goto error_out; } - st->mq_schema = mq_schema_new(); - if (st->mq_schema == NULL) - { - CORE_LOG_ERROR("unable to create mq schema"); - goto error_out; - } - - st->mod_mgr = module_manager_new(mod_hooks, count_of(mod_hooks),st->thread_num, toml_file, st->mq_schema, st->logger); + st->mod_mgr = module_manager_new(mod_hooks, count_of(mod_hooks),st->thread_num, toml_file, st->logger); if (st->mod_mgr == NULL) { CORE_LOG_ERROR("unable to create packet manager"); @@ -287,7 +277,6 @@ void stellar_free(struct stellar *st) { packet_io_free(st->pkt_io); module_manager_free(st->mod_mgr); - mq_schema_free(st->mq_schema); CORE_LOG_FATAL("stellar exit\n"); log_free(st->logger);