diff --git a/infra/mq/mq.c b/infra/mq/mq.c index a054e19..e17e996 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -132,7 +132,6 @@ int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, vo struct mq_message mq_elt; mq_elt.rt=rt; mq_elt.header.topic_id = topic_id; - mq_elt.header.priority = STELLAR_MQ_PRIORITY_HIGH; mq_elt.body = msg; mq_dispatch_one_message(topic, &mq_elt); if (topic->free_cb) @@ -150,21 +149,20 @@ void mq_runtime_clean(struct mq_runtime *rt) struct mq_topic *topic; rt->is_cleaning=true; - for (unsigned long i = 0; i < count_of(rt->priority_mq); i++) + for (int i = 0; i < MQ_TYPE_MAX; i++) { - DL_FOREACH_SAFE(rt->priority_mq[i], mq_elt, tmp) + DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp) { topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); if (topic && topic->free_cb) { topic->free_cb(mq_elt->body, topic->free_cb_arg); } - DL_DELETE(rt->priority_mq[i], mq_elt); - rt->priority_mq_len[i]-=1; + DL_DELETE(rt->mq[i], mq_elt); + rt->mq_len[i] -= 1; FREE(mq_elt); } } - rt->is_cleaning=false; } @@ -172,27 +170,19 @@ void mq_runtime_dispatch(struct mq_runtime *rt) { struct mq_topic *topic=NULL; struct mq_message *mq_elt=NULL, *mq_tmp=NULL; - int cur_priority = STELLAR_MQ_PRIORITY_HIGH; - while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) - { - if(rt->priority_mq[cur_priority]==NULL) - { - cur_priority--; - continue; - } - DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp) + while (rt->mq_len[MQ_TYPE_MAILBOX]) + { + DL_FOREACH_SAFE(rt->mq[MQ_TYPE_MAILBOX], mq_elt, mq_tmp) { - DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt); - rt->priority_mq_len[mq_elt->header.priority]-=1; - topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(mq_elt->header.topic_id)); - mq_dispatch_one_message(topic, mq_elt); - DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list - rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1; - cur_priority=STELLAR_MQ_PRIORITY_HIGH; - break; - } - } - //mq_runtime_clean(rt); + DL_DELETE(rt->mq[MQ_TYPE_MAILBOX], mq_elt); + rt->mq_len[MQ_TYPE_MAILBOX] -= 1; + topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); + mq_dispatch_one_message(topic, mq_elt); + DL_APPEND(rt->mq[MQ_TYPE_DEATH_LETTER], mq_elt); // move to dlq list + rt->mq_len[MQ_TYPE_DEATH_LETTER] += 1; + } + } + //mq_runtime_clean(rt); return; } @@ -213,11 +203,10 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms return 0; } -int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority) +int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) { if(rt==NULL || rt->schema == NULL)return -1; if(rt->is_cleaning==true)return -1; - if(priority < STELLAR_MQ_PRIORITY_LOW || priority > STELLAR_MQ_PRIORITY_HIGH)return -1; struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); if(topic==NULL)return -1; @@ -225,18 +214,12 @@ int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id struct mq_message *msg= CALLOC(struct mq_message,1); msg->rt=rt; msg->header.topic_id = topic_id; - msg->header.priority = priority; msg->body = data; - DL_APPEND(rt->priority_mq[priority], msg); - rt->priority_mq_len[priority]+=1; + DL_APPEND(rt->mq[MQ_TYPE_MAILBOX], msg); + rt->mq_len[MQ_TYPE_MAILBOX]+=1; return 0; } -int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) -{ - return mq_runtime_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM); -} - struct mq_schema *mq_schema_new() { struct mq_schema *s = CALLOC(struct mq_schema,1); diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index 146d222..342c79b 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -10,22 +10,12 @@ extern "C" #include -enum mq_property -{ - STELLAR_MQ_DEATH_LETTER = 0, - STELLAR_MQ_PRIORITY_LOW = 1, - STELLAR_MQ_PRIORITY_MEDIUM, - STELLAR_MQ_PRIORITY_HIGH, - STELLAR_MQ_MAX -}; - struct mq_message { struct mq_runtime *rt; struct { int topic_id; - enum mq_property priority; } header; void *body; struct mq_message *next, *prev; @@ -62,15 +52,21 @@ struct mq_schema }; +enum mq_property +{ + MQ_TYPE_MAILBOX = 0, + MQ_TYPE_DEATH_LETTER = 1, + MQ_TYPE_MAX, +}; + struct mq_runtime { struct mq_schema *schema; - struct mq_message *priority_mq[STELLAR_MQ_MAX];// message list - size_t priority_mq_len[STELLAR_MQ_MAX]; + struct mq_message *mq[MQ_TYPE_MAX];// message queue + size_t mq_len[MQ_TYPE_MAX]; bool is_cleaning; }; -int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *msg, enum mq_property priority); int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg); #ifdef __cplusplus diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp index a2ee952..06050b2 100644 --- a/infra/mq/test/gtest_mq_main.cpp +++ b/infra/mq/test/gtest_mq_main.cpp @@ -304,7 +304,7 @@ TEST(mq_runtime, basic_pub_sub) { EXPECT_TRUE(rt!=NULL); env.rt=rt; - int N_packet=10; + int N_packet=1; for (int i = 0; i < N_packet; i++) { mq_runtime_publish_message(rt, packet_in_topic_id, &pkt); @@ -537,6 +537,7 @@ TEST(mq_runtime, sub_with_dispatch_cb) { EXPECT_EQ(env.sess_msg_free_called, env.N_session); } +#if 0 struct test_priority_mq_env { struct mq_schema *s; @@ -671,7 +672,7 @@ TEST(mq_runtime, basic_mq_priority) { EXPECT_EQ(env.plugin_id_1_called,env.N_round*3); EXPECT_EQ(env.plugin_id_2_called,env.N_round*3); } - +#endif struct test_polling_module {