From 87944eb1153db3c39c330acb057a31d611d2b459 Mon Sep 17 00:00:00 2001 From: yangwei Date: Thu, 19 Sep 2024 15:58:39 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(mq):=20mq=5Fruntime=5Fclean=20?= =?UTF-8?q?in=20mq.h?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/stellar/mq.h | 14 +- infra/mq/mq.c | 47 ++-- infra/mq/mq_internal.h | 17 +- infra/mq/test/gtest_mq_main.cpp | 368 +++++++++++++++++++++----------- 4 files changed, 289 insertions(+), 157 deletions(-) diff --git a/include/stellar/mq.h b/include/stellar/mq.h index 0afc201..aa7b416 100644 --- a/include/stellar/mq.h +++ b/include/stellar/mq.h @@ -9,10 +9,6 @@ struct mq_schema; struct mq_schema *mq_schema_new(); void mq_schema_free(struct mq_schema *s); -struct mq_runtime; -struct mq_runtime *mq_runtime_new(struct mq_schema *s); -void mq_runtime_free(struct mq_runtime *s); - typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg); typedef void on_msg_cb_func(int topic_id, void *msg, void *on_msg_arg); typedef void on_msg_dispatch_cb_func(int topic_id, @@ -44,8 +40,14 @@ int mq_schema_destroy_topic(struct mq_schema *s, int topic_id); int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg); -int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg); -void mq_runtime_dispatch(struct mq_runtime *rt); +struct mq_runtime; +struct mq_runtime *mq_runtime_new(struct mq_schema *s); +void mq_runtime_free(struct mq_runtime *s); + +// return 0 if success, otherwise return -1 +int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg);// append message to pending queue +void mq_runtime_dispatch(struct mq_runtime *rt);// dispatch all message in pending queue, dispatched message will be append to dlq +void mq_runtime_clean(struct mq_runtime *rt); // free all message in dlq and pending queue, during this period, publish will be disabled #ifdef __cplusplus } diff --git a/infra/mq/mq.c b/infra/mq/mq.c index c4b315c..13a388b 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -109,42 +109,45 @@ int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) static void mq_dispatch_one_message(struct mq_schema *s, struct mq_message *mq_elt) { struct mq_subscriber *sub_elt, *sub_tmp; - struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array, - (unsigned int)(mq_elt->header.topic_id)); + struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array,(unsigned int)(mq_elt->header.topic_id)); if (topic) { DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { if (sub_elt->msg_cb) { - if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg, topic->dispatch_cb_arg); + if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id, + mq_elt->body, + sub_elt->msg_cb, + sub_elt->msg_cb_arg, + topic->dispatch_cb_arg); else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg); } } } } -static void mq_runtime_clean(struct mq_runtime *rt) +void mq_runtime_clean(struct mq_runtime *rt) { struct mq_message *mq_elt, *tmp; struct mq_topic *topic; + rt->publish_enabled=false; - for(int i=0; i < STELLAR_MQ_PRIORITY_MAX; i++) - { - assert(rt->priority_mq[i]==NULL); - } - - DL_FOREACH_SAFE(rt->dealth_letter_queue, mq_elt, tmp) + for (int i = 0; i < STELLAR_MQ_MAX; i++) { - topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, - (unsigned int)(mq_elt->header.topic_id)); - if (topic && topic->free_cb) + DL_FOREACH_SAFE(rt->priority_mq[i], mq_elt, tmp) { - topic->free_cb(mq_elt->body, topic->free_cb_arg); + topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); + if (topic && topic->free_cb) + { + topic->free_cb(mq_elt->body, topic->free_cb_arg); + } + DL_DELETE(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); + FREE(mq_elt); } - DL_DELETE(rt->dealth_letter_queue, mq_elt); - FREE(mq_elt); } + + rt->publish_enabled=true; } void mq_runtime_dispatch(struct mq_runtime *rt) @@ -160,15 +163,14 @@ void mq_runtime_dispatch(struct mq_runtime *rt) } DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp) { - mq_dispatch_one_message(rt->schema, mq_elt); DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt); - DL_APPEND(rt->dealth_letter_queue, mq_elt); // move to dlq list - + mq_dispatch_one_message(rt->schema, mq_elt); + DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list cur_priority=STELLAR_MQ_PRIORITY_HIGH; break; } } - mq_runtime_clean(rt); + //mq_runtime_clean(rt); return; } @@ -195,10 +197,12 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms return 0; } -int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority) +int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority) { if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; + if(rt->publish_enabled==false)return -1; + unsigned int len = utarray_len(rt->schema->topic_array); if (len <= (unsigned int)topic_id)return -1; struct mq_message *msg= CALLOC(struct mq_message,1); @@ -241,6 +245,7 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s) if(s==NULL)return NULL; struct mq_runtime *rt = CALLOC(struct mq_runtime,1); rt->schema=s; + rt->publish_enabled=true; return rt; } diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index c246894..0ce5b60 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -8,12 +8,15 @@ extern "C" #include "stellar/mq.h" #include "uthash/utarray.h" -enum mq_priority +#include + +enum mq_property { - STELLAR_MQ_PRIORITY_LOW = 0, + STELLAR_MQ_DEATH_LETTER = 0, + STELLAR_MQ_PRIORITY_LOW = 1, STELLAR_MQ_PRIORITY_MEDIUM, STELLAR_MQ_PRIORITY_HIGH, - STELLAR_MQ_PRIORITY_MAX + STELLAR_MQ_MAX }; struct mq_message @@ -22,7 +25,7 @@ struct mq_message struct { int topic_id; - enum mq_priority priority; + enum mq_property priority; } header; void *body; struct mq_message *next, *prev; @@ -62,11 +65,11 @@ struct mq_schema struct mq_runtime { struct mq_schema *schema; - struct mq_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list - struct mq_message *dealth_letter_queue;// dlq list + struct mq_message *priority_mq[STELLAR_MQ_MAX];// message list + bool publish_enabled; }; -int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority); +int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority); #ifdef __cplusplus } diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp index 423b41d..dbe0f25 100644 --- a/infra/mq/test/gtest_mq_main.cpp +++ b/infra/mq/test/gtest_mq_main.cpp @@ -256,13 +256,135 @@ TEST(mq_runtime, basic_pub_sub) { mq_runtime_dispatch(rt); } + mq_runtime_free(rt); + mq_schema_free(s); EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt); EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); - mq_runtime_free(rt); - mq_schema_free(s); + +} + +struct test_pub_on_free_env +{ + mq_schema *s; + mq_runtime *rt; + int N_round; + int current_round; + int topic_id; + int on_msg_free_called; + int on_msg_called; +}; + + +static void test_pub_on_msg_free(void *msg, void *msg_free_arg) +{ + struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)msg_free_arg; + env->on_msg_free_called+=1; + if(env->current_round==env->N_round-1 && (long)msg!=env->N_round) + { + EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void *)(long)(env->N_round)), -1);//on message free, publish always failed + } + return; +} + +static void test_pub_on_free_on_msg(int topic_id, void *msg, void *sub_arg) +{ + struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)sub_arg; + env->on_msg_called+=1; + if((int)(long)msg==env->N_round) + { + EXPECT_EQ(env->on_msg_called, env->N_round+1); + EXPECT_EQ(env->current_round, env->N_round-1); + } + return; +} + +TEST(mq_runtime, pub_on_msg_free) +{ + struct test_pub_on_free_env env={}; + env.s=mq_schema_new(); + EXPECT_TRUE(env.s!=NULL); + env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_on_msg_free , &env); + EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_on_free_on_msg, &env), 0); + + env.N_round=10; + env.rt=mq_runtime_new(env.s); + EXPECT_TRUE(env.rt!=NULL); + for(int i=0; ion_msg_free_called+=1; + return; +} + +static void test_dispatch_on_msg(int topic_id, void *msg, void *sub_arg) +{ + struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)sub_arg; + env->on_msg_called+=1; + if(env->current_round==(long)msg) + { + EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void*)(long)env->N_round), 0); + } + else + { + EXPECT_EQ((long)msg, env->N_round); + } + mq_runtime_dispatch(env->rt); + return; +} + +TEST(mq_runtime, call_dispatch_on_msg_dispatch) +{ + struct test_dispatch_on_msg_env env={}; + env.s=mq_schema_new(); + EXPECT_TRUE(env.s!=NULL); + env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_dispatch_on_msg_free , &env); + EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_dispatch_on_msg, &env), 0); + + env.N_round=10; + env.rt=mq_runtime_new(env.s); + EXPECT_TRUE(env.rt!=NULL); + for(int i=0; imsg_free_cnt+=1; - FREE(msg); - return; -} - -static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) -{ - struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; - EXPECT_TRUE(env!=NULL); - env->msg_sub_cnt+=1; - return; -} - -static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) -{ - struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; - EXPECT_TRUE(env!=NULL); - //EXPECT_EQ(pkt->ip_proto, ip_protocol); - int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); - unsigned int cnt=0; - int *msg; - for(int i=0; iplug_mgr->max_message_dispatch; j++) - { - msg=CALLOC(int, 1); - *msg=cnt; - int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); - if(cnt < env->plug_mgr->max_message_dispatch) - { - ASSERT_EQ(pub_ret, 0); - env->msg_pub_cnt+=1; - } - else - { - ASSERT_EQ(pub_ret, -1); - } - if(pub_ret!=0)FREE(msg); - cnt+=1; - } - } - return; -} - -TEST(mq_runtime, packet_plugins_pub_overlimit) { - - struct stellar st={0}; - struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); - whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); - - unsigned char ip_proto=6; - struct packet_plugin_env env; - memset(&env, 0, sizeof(struct packet_plugin_env)); - env.plug_mgr=plug_mgr; - char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; - - int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); - - for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); - EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); - EXPECT_EQ(topic->free_cb_arg, &env); - EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); - EXPECT_STREQ(topic->topic_name, topic_name[i]); - } - } - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); - } - - int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); - EXPECT_GE(pub_plugin_id, 0); - - int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); - - for (int i = 0; i < topic_sub_num; i++) - { - env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok - EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); - for(int j = 0; j < topic_id_num; j++) - { - EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); - } - } - - { - SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); - } - - struct packet pkt={&st, IPv4, ip_proto}; - - int N_packet=10; - for (int i = 0; i < N_packet; i++) - { - plugin_manager_on_packet_input(plug_mgr, &pkt); - plugin_manager_on_packet_output(plug_mgr, &pkt); - } - - plugin_manager_exit(plug_mgr); - EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); - EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); - EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); -} - - -#endif struct test_priority_mq_env { @@ -617,6 +618,127 @@ TEST(mq_runtime, basic_mq_priority) { EXPECT_EQ(env.plugin_id_2_called,env.N_round*3); } +#if 0 +//TODO: test case mq for overlimit +static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + //EXPECT_EQ(pkt->ip_proto, ip_protocol); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + unsigned int cnt=0; + int *msg; + for(int i=0; iplug_mgr->max_message_dispatch; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); + if(cnt < env->plug_mgr->max_message_dispatch) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(mq_runtime, pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); + EXPECT_GE(pub_plugin_id, 0); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + + +#endif /**********************************************