feat(stellar mq topic api): add dispatch_cb_arg for dispatch_cb

This commit is contained in:
yangwei
2024-09-06 13:31:29 +08:00
parent 442586ef52
commit cc542dc365
4 changed files with 21 additions and 20 deletions

View File

@@ -17,12 +17,12 @@ inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unus
}
typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env);
typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *sub_plugin_env);
typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env);
//return topic_id
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_destroy_topic(struct stellar *st, int topic_id);

View File

@@ -346,7 +346,7 @@ int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
return -1;
}
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
@@ -356,12 +356,13 @@ int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_c
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->dispatch_cb=on_dispatch_cb;
t_schema->dispatch_cb_arg=on_dispatch_arg;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_mq_schema_array == NULL)
@@ -379,6 +380,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_d
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.dispatch_cb_arg=on_dispatch_arg;
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
@@ -430,8 +432,7 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (plugin_schema)
{
//TODO: maybe need pub_plugin_env as dispatch_cb parameter
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, plugin_schema->plugin_env);
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, plugin_schema->plugin_env);
else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env);
}
}

View File

@@ -27,7 +27,6 @@ struct plugin_manager_per_thread_data
struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list
long long pub_packet_msg_cnt;
long long pub_polling_msg_cnt;//TODO
};
@@ -89,12 +88,13 @@ typedef struct stellar_mq_subscriber
struct stellar_mq_topic_schema
{
char *topic_name;
void *free_cb_arg;
int topic_id;
int subscriber_cnt;
int is_destroyed;
on_msg_dispatch_cb_func *dispatch_cb;
void *dispatch_cb_arg;
stellar_msg_free_cb_func *free_cb;
void *free_cb_arg;
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));

View File

@@ -102,7 +102,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) {
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st);
int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
struct stellar_mq_topic_schema *topic_schema = NULL;
{
@@ -116,7 +116,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) {
}
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_packet_msg_free, plug_mgr),
EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr),
-1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -128,7 +128,7 @@ TEST(plugin_manager_init, stellar_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -165,7 +165,7 @@ TEST(plugin_manager_init, stellar_mq_subscribe) {
const char *topic_name="PACKET_TOPIC";
int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st);
int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id
@@ -237,7 +237,7 @@ TEST(plugin_manager_init, stellar_mq_subscribe_overwrite) {
const char *topic_name="SESSION_TOPIC";
int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st);
int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_session_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id
@@ -590,7 +590,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, test_packet_msg_free_cb_func, &env);
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, NULL, test_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -707,7 +707,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, overlimit_packet_msg_free_cb_func, &env);
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -812,7 +812,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
EXPECT_GE(plugin_id, 0);
env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, test_exdata_free_pub_msg_free, &env);
env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, NULL, test_exdata_free_pub_msg_free, &env);
EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
@@ -872,7 +872,7 @@ struct session_manager_plugin_env
typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *plugin_env);
static void pesudo_on_msg_dispatch(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *sub_plugin_env)
static void pesudo_on_msg_dispatch(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env)
{
on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb;
struct session *sess=(struct session *)msg;
@@ -904,7 +904,7 @@ static void pesudo_session_load(struct stellar *st, struct session_manager_plugi
env->sess[i].session_exdat_rt=session_exdata_runtime_new(st);
env->sess[i].type=SESSION_TYPE_TCP;
}
env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, env);
env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, env);
env->session_manager_plugin_id=stellar_plugin_register(st, pesudo_on_packet_input, pesudo_on_packet_output, env);
}
@@ -922,7 +922,7 @@ static int pesudo_tcp_session_subscribe(struct stellar *st, on_session_msg_cb_fu
int topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP);
if(topic_id<0)
{
topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL);
topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, NULL);
}
return stellar_mq_subscribe(st, topic_id, (on_msg_cb_func *)on_session_cb, plugin_id);
}