diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h index b2a5f44..7856719 100644 --- a/include/stellar/stellar_mq.h +++ b/include/stellar/stellar_mq.h @@ -17,12 +17,12 @@ inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unus } typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env); -typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *sub_plugin_env); +typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env); //return topic_id -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name); -int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); int stellar_mq_destroy_topic(struct stellar *st, int topic_id); diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index e847cc1..77e0bcb 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -346,7 +346,7 @@ int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name) return -1; } -int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; @@ -356,12 +356,13 @@ int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_c struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); if(t_schema == NULL)return -1; t_schema->dispatch_cb=on_dispatch_cb; + t_schema->dispatch_cb_arg=on_dispatch_arg; t_schema->free_cb=msg_free_cb; t_schema->free_cb_arg=msg_free_arg; return 0; } -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); if(plug_mgr->stellar_mq_schema_array == NULL) @@ -379,6 +380,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_d t_schema.free_cb=msg_free_cb; t_schema.topic_name=(char *)topic_name; t_schema.topic_id=len;//topid_id equals arrary index + t_schema.dispatch_cb_arg=on_dispatch_arg; t_schema.free_cb_arg=msg_free_arg; t_schema.subscribers=NULL; t_schema.subscriber_cnt=0; @@ -430,8 +432,7 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt) plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); if (plugin_schema) { - //TODO: maybe need pub_plugin_env as dispatch_cb parameter - if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, plugin_schema->plugin_env); + if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, plugin_schema->plugin_env); else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env); } } diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index 7085936..9bbc763 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -27,7 +27,6 @@ struct plugin_manager_per_thread_data struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list struct stellar_message *dealth_letter_queue;// dlq list long long pub_packet_msg_cnt; - long long pub_polling_msg_cnt;//TODO }; @@ -89,12 +88,13 @@ typedef struct stellar_mq_subscriber struct stellar_mq_topic_schema { char *topic_name; - void *free_cb_arg; int topic_id; int subscriber_cnt; int is_destroyed; on_msg_dispatch_cb_func *dispatch_cb; + void *dispatch_cb_arg; stellar_msg_free_cb_func *free_cb; + void *free_cb_arg; struct stellar_mq_subscriber *subscribers; }__attribute__((aligned(sizeof(void*)))); diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp index 816caea..d2e8f73 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp @@ -102,7 +102,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) { EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name - int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st); + int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); struct stellar_mq_topic_schema *topic_schema = NULL; { @@ -116,7 +116,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) { } EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); - EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -128,7 +128,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -165,7 +165,7 @@ TEST(plugin_manager_init, stellar_mq_subscribe) { const char *topic_name="PACKET_TOPIC"; - int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id @@ -237,7 +237,7 @@ TEST(plugin_manager_init, stellar_mq_subscribe_overwrite) { const char *topic_name="SESSION_TOPIC"; - int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_session_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id @@ -590,7 +590,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { for(int i=0; isess[i].session_exdat_rt=session_exdata_runtime_new(st); env->sess[i].type=SESSION_TYPE_TCP; } - env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, env); + env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, env); env->session_manager_plugin_id=stellar_plugin_register(st, pesudo_on_packet_input, pesudo_on_packet_output, env); } @@ -922,7 +922,7 @@ static int pesudo_tcp_session_subscribe(struct stellar *st, on_session_msg_cb_fu int topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP); if(topic_id<0) { - topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL); + topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, NULL); } return stellar_mq_subscribe(st, topic_id, (on_msg_cb_func *)on_session_cb, plugin_id); }