From 154e727f07fd2e1a9544e1a3941865fe247bd011 Mon Sep 17 00:00:00 2001 From: yangwei Date: Wed, 25 Sep 2024 17:44:27 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(mq):=20add=20qlen=20in=20mq=5F?= =?UTF-8?q?runtime=20struct?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/mq/mq.c | 12 ++++++++---- infra/mq/mq_internal.h | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/infra/mq/mq.c b/infra/mq/mq.c index aca3d73..9e75907 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -133,7 +133,7 @@ void mq_runtime_clean(struct mq_runtime *rt) struct mq_message *mq_elt, *tmp; struct mq_topic *topic; - rt->publish_enabled=false; + rt->is_cleaning=true; for (unsigned long i = 0; i < count_of(rt->priority_mq); i++) { @@ -145,11 +145,12 @@ void mq_runtime_clean(struct mq_runtime *rt) topic->free_cb(mq_elt->body, topic->free_cb_arg); } DL_DELETE(rt->priority_mq[i], mq_elt); + rt->priority_mq_len[i]-=1; FREE(mq_elt); } } - rt->publish_enabled=true; + rt->is_cleaning=false; } void mq_runtime_dispatch(struct mq_runtime *rt) @@ -166,8 +167,10 @@ void mq_runtime_dispatch(struct mq_runtime *rt) DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp) { DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt); + rt->priority_mq_len[mq_elt->header.priority]-=1; mq_dispatch_one_message(rt->schema, mq_elt); DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list + rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1; cur_priority=STELLAR_MQ_PRIORITY_HIGH; break; } @@ -202,7 +205,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority) { if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; - if(rt->publish_enabled==false)return -1; + if(rt->is_cleaning==true)return -1; if(priority < STELLAR_MQ_PRIORITY_LOW || priority > STELLAR_MQ_PRIORITY_HIGH)return -1; unsigned int len = utarray_len(rt->schema->topic_array); @@ -213,6 +216,7 @@ int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id msg->header.priority = priority; msg->body = data; DL_APPEND(rt->priority_mq[priority], msg); + rt->priority_mq_len[priority]+=1; return 0; } @@ -247,7 +251,7 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s) if(s==NULL)return NULL; struct mq_runtime *rt = CALLOC(struct mq_runtime,1); rt->schema=s; - rt->publish_enabled=true; + rt->is_cleaning=false; return rt; } diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index 0ce5b60..4e7a31f 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -66,7 +66,8 @@ struct mq_runtime { struct mq_schema *schema; struct mq_message *priority_mq[STELLAR_MQ_MAX];// message list - bool publish_enabled; + size_t priority_mq_len[STELLAR_MQ_MAX]; + bool is_cleaning; }; int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority);