diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h index 4780990..b2a5f44 100644 --- a/include/stellar/stellar_mq.h +++ b/include/stellar/stellar_mq.h @@ -16,10 +16,13 @@ inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unus if(msg)FREE(msg); } +typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env); +typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *sub_plugin_env); + //return topic_id -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name); -int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); int stellar_mq_destroy_topic(struct stellar *st, int topic_id); @@ -31,9 +34,9 @@ enum stellar_mq_priority STELLAR_MQ_PRIORITY_MAX, }; -typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env); + //return 0 if success, otherwise return -1. -int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id); +int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, int plugin_id); int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg); int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority); diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index 21af4a3..05e72f1 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -149,14 +149,14 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); - if(plug_mgr->registered_packet_plugin_array) + if(plug_mgr->registered_plugin_array) { - struct registered_packet_plugin_schema *s = NULL; - while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s))) + struct registered_plugin_schema *s = NULL; + while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, s))) { - if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info); + if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info); } - utarray_free(plug_mgr->registered_packet_plugin_array); + utarray_free(plug_mgr->registered_plugin_array); } plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); FREE(plug_mgr); @@ -314,11 +314,7 @@ static void stellar_mq_topic_schema_copy(void *_dst, const void *_src) { struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst, *src = (struct stellar_mq_topic_schema *)_src; - dst->subscribers = src->subscribers; - dst->free_cb = src->free_cb; - dst->free_cb_arg = src->free_cb_arg; - dst->topic_id = src->topic_id; - dst->subscriber_cnt = src->subscriber_cnt; + memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema)); dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; } @@ -351,7 +347,7 @@ int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name) return -1; } -int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; @@ -360,12 +356,13 @@ int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_c if(len < (unsigned int)topic_id)return -1; struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); if(t_schema == NULL)return -1; + t_schema->dispatch_cb=on_dispatch_cb; t_schema->free_cb=msg_free_cb; t_schema->free_cb_arg=msg_free_arg; return 0; } -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); if(plug_mgr->stellar_mq_schema_array == NULL) @@ -379,6 +376,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_ } struct stellar_mq_topic_schema t_schema; memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema)); + t_schema.dispatch_cb=on_dispatch_cb; t_schema.free_cb=msg_free_cb; t_schema.topic_name=(char *)topic_name; t_schema.topic_id=len;//topid_id equals arrary index @@ -415,60 +413,12 @@ int stellar_mq_destroy_topic(struct stellar *st, int topic_id) return 1; // success } -UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; - -static int __stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info) -{ - if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1; - - unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); - if(topic==NULL)return -1; - - // if plugin already subscribe current topic, return 0 - struct stellar_mq_subscriber_info *p=NULL; - while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p))) - { - if(p->topic_id==topic_id) - { - struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; - int cnt=0; - while(tmp_subscriber) - { - if(cnt==p->subscriber_idx) - { - tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb; - return 0; - } - cnt++; - tmp_subscriber=tmp_subscriber->next; - } - } - }; - - struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_idx = plugin_idx; - new_subscriber->plugin_msg_cb = plugin_on_msg_cb; - DL_APPEND(topic->subscribers, new_subscriber); - - struct stellar_mq_subscriber_info sub_info; - memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); - sub_info.topic_id=topic_id; - sub_info.subscriber_idx=topic->subscriber_cnt; - utarray_push_back(registed_mq_subscriber_info, &sub_info); - topic->subscriber_cnt+=1; - plug_mgr->mq_topic_subscriber_num+=1; - return 0; -} static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt) { struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st); struct stellar_mq_subscriber *sub_elt, *sub_tmp; - struct registered_packet_plugin_schema *packet_plugin_schema; + struct registered_plugin_schema *plugin_schema; struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)(mq_elt->header.topic_id)); if (topic) @@ -477,11 +427,13 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt) { if (sub_elt->plugin_msg_cb) { - packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr( - plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); - if (packet_plugin_schema) + plugin_schema = (struct registered_plugin_schema *)utarray_eltptr( + plug_mgr->registered_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (plugin_schema) { - sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env); + //TODO: maybe need pub_plugin_env as dispatch_cb parameter + if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, plugin_schema->plugin_env); + else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env); } } } @@ -529,9 +481,7 @@ static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_a } } -/******************************* - * PACKET MQ * - *******************************/ +UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; //return 0 if success, otherwise return -1. int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id) @@ -539,17 +489,57 @@ int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugi int plugin_idx=plugin_id; struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; + if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL)return -1; - struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); - if(packet_plugin_schema==NULL)return -1; + struct registered_plugin_schema *plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(plug_mgr->registered_plugin_array, (unsigned)plugin_idx); + if(plugin_schema==NULL)return -1; - if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL) + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + if(plugin_schema->registed_mq_subscriber_info==NULL) { - utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd); + utarray_new(plugin_schema->registed_mq_subscriber_info, &stellar_mq_subscriber_info_icd); } - return __stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info); + // if plugin already subscribe current topic, return 0 + struct stellar_mq_subscriber_info *p=NULL; + while( (p=(struct stellar_mq_subscriber_info *)utarray_next(plugin_schema->registed_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + { + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } + } + }; + + struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->plugin_idx = plugin_idx; + new_subscriber->plugin_msg_cb = plugin_on_msg_cb; + DL_APPEND(topic->subscribers, new_subscriber); + + struct stellar_mq_subscriber_info sub_info; + memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); + sub_info.topic_id=topic_id; + sub_info.subscriber_idx=topic->subscriber_cnt; + utarray_push_back(plugin_schema->registed_mq_subscriber_info, &sub_info); + topic->subscriber_cnt+=1; + plug_mgr->mq_topic_subscriber_num+=1; + return 0; } int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority) @@ -618,31 +608,31 @@ void session_exdata_runtime_free(struct stellar_exdata *exdata_rt) /********************************************* - * PLUGIN MANAGER PACKET PLUGIN * + * PLUGIN MANAGER PLUGIN * *********************************************/ -UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; +UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL}; int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->registered_packet_plugin_array == NULL) + if(plug_mgr->registered_plugin_array == NULL) { - utarray_new(plug_mgr->registered_packet_plugin_array, ®istered_packet_plugin_array_icd); + utarray_new(plug_mgr->registered_plugin_array, ®istered_plugin_array_icd); } - struct registered_packet_plugin_schema packet_plugin_schema; + struct registered_plugin_schema packet_plugin_schema; memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); packet_plugin_schema.ip_protocol = ip_proto; packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input; packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output; packet_plugin_schema.plugin_env = plugin_env; - utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); - return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + utarray_push_back(plug_mgr->registered_plugin_array, &packet_plugin_schema); + return (utarray_len(plug_mgr->registered_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index } static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out) { - if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; - struct registered_packet_plugin_schema *p=NULL; + if(plug_mgr==NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return; + struct registered_plugin_schema *p=NULL; //TODO: get innermost layer ip protocol by packet api struct tuple6 t6; @@ -652,7 +642,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str int tid=stellar_get_current_thread_index(); //TODO : provide public api to reset pub_msg_cnt plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt - while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) + while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet[in_out]) { @@ -670,7 +660,7 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { - if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; + if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return; plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); int tid=stellar_get_current_thread_index(); plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index 0929956..7dc224e 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -37,7 +37,7 @@ struct plugin_manager_schema UT_array *plugin_load_specs_array; UT_array *stellar_exdata_schema_array; UT_array *stellar_mq_schema_array; - UT_array *registered_packet_plugin_array; + UT_array *registered_plugin_array; UT_array *registered_polling_plugin_array; int stellar_mq_topic_num; int mq_topic_subscriber_num; @@ -93,6 +93,7 @@ struct stellar_mq_topic_schema int topic_id; int subscriber_cnt; int is_destroyed; + on_msg_dispatch_cb_func *dispatch_cb; stellar_msg_free_cb_func *free_cb; struct stellar_mq_subscriber *subscribers; }__attribute__((aligned(sizeof(void*)))); @@ -105,12 +106,12 @@ enum packet_stage PACKET_STAGE_MAX }; -struct registered_packet_plugin_schema +struct registered_plugin_schema { char ip_protocol; plugin_on_packet_func *on_packet[PACKET_STAGE_MAX]; void *plugin_env; - UT_array *registed_packet_mq_subscriber_info; + UT_array *registed_mq_subscriber_info; }__attribute__((aligned(sizeof(void*)))); struct registered_polling_plugin_schema diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp index 89b7927..2b8436c 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp @@ -27,7 +27,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p //registered plugin array null EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); - EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); + EXPECT_TRUE(plug_mgr->registered_plugin_array==NULL); EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); int thread_num=stellar_get_worker_thread_num(st); @@ -102,7 +102,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name - int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); struct stellar_mq_topic_schema *topic_schema = NULL; { @@ -116,7 +116,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { } EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); - EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr), + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -128,7 +128,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -166,7 +166,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) { const char *topic_name="PACKET_TOPIC"; - int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id @@ -246,7 +246,7 @@ TEST(plugin_manager, packet_plugin_illegal_exdata) { { SCOPED_TRACE("White-box test, check stellar internal schema"); - int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array); + int packet_plugin_num = utarray_len(plug_mgr->registered_plugin_array); EXPECT_EQ(packet_plugin_num, 1); } @@ -292,7 +292,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) { { SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num); + EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), proto_filter_plugin_num); } struct packet pkt={&st, IPv4, 0}; @@ -420,7 +420,7 @@ TEST(plugin_manager, packet_plugins_share_exdata) { { SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number + EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), 2); // Fix plugin number } struct packet pkt={&st, IPv4, ip_proto}; @@ -490,7 +490,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { for(int i=0; iregistered_packet_plugin_array), topic_sub_num+1); + EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), topic_sub_num+1); } struct packet pkt={&st, IPv4, ip_proto}; @@ -607,7 +607,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) { for(int i=0; iregistered_packet_plugin_array), topic_sub_num+1); + EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), topic_sub_num+1); } struct packet pkt={&st, IPv4, ip_proto}; @@ -712,7 +712,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { EXPECT_GE(plugin_id, 0); env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); - env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env); + env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, test_exdata_free_pub_msg_free, &env); EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); @@ -776,7 +776,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name - int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st); EXPECT_GE(topic_id, 0); struct stellar_mq_topic_schema *topic_schema; { @@ -790,7 +790,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { } EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); - EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -802,7 +802,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0); + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_session_msg_free, plug_mgr), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -842,7 +842,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) { const char *topic_name="SESSION_TOPIC"; - int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id @@ -1014,11 +1014,11 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) { int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0); - env.intrinsc_egress_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_OUTPUT, NULL, NULL); + env.intrinsc_egress_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_OUTPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_egress_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error @@ -1122,11 +1122,11 @@ TEST(plugin_manager, DISABLED_session_plugin_ignore_on_ctx_new_sub_other_msg) { env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL,&env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0); - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_session_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); @@ -1288,11 +1288,11 @@ TEST(plugin_manager,DISABLED_session_plugin_pub_msg_overlimt) { env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_overlimit_session_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); @@ -1399,11 +1399,11 @@ TEST(plugin_manager, DISABLED_session_plugin_on_ctx_new_then_dettach) { int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0); - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_dettach_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0); @@ -1486,11 +1486,11 @@ TEST(plugin_manager, DISABLED_session_plugin_pub_on_ctx_free) { int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0); - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); // pesudo packet and session @@ -1608,11 +1608,11 @@ TEST(plugin_manager, DISABLED_session_plugin_pub_msg_on_closing) { int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); - env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); @@ -1729,7 +1729,7 @@ TEST(plugin_manager, DISABLED_test_session_mq_topic_is_active) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0); @@ -1828,7 +1828,7 @@ TEST(plugin_manager, DISABLED_test_session_dettach) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0); @@ -1974,7 +1974,7 @@ TEST(plugin_manager, test_session_mq_priority) { env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ; env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ; - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); @@ -2063,7 +2063,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) { env.plugin_id_1=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(env.plugin_id_1,0); - env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL); + env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0);