✨ feat(mq): add qlen in mq_runtime struct
This commit is contained in:
@@ -133,7 +133,7 @@ void mq_runtime_clean(struct mq_runtime *rt)
|
||||
|
||||
struct mq_message *mq_elt, *tmp;
|
||||
struct mq_topic *topic;
|
||||
rt->publish_enabled=false;
|
||||
rt->is_cleaning=true;
|
||||
|
||||
for (unsigned long i = 0; i < count_of(rt->priority_mq); i++)
|
||||
{
|
||||
@@ -145,11 +145,12 @@ void mq_runtime_clean(struct mq_runtime *rt)
|
||||
topic->free_cb(mq_elt->body, topic->free_cb_arg);
|
||||
}
|
||||
DL_DELETE(rt->priority_mq[i], mq_elt);
|
||||
rt->priority_mq_len[i]-=1;
|
||||
FREE(mq_elt);
|
||||
}
|
||||
}
|
||||
|
||||
rt->publish_enabled=true;
|
||||
rt->is_cleaning=false;
|
||||
}
|
||||
|
||||
void mq_runtime_dispatch(struct mq_runtime *rt)
|
||||
@@ -166,8 +167,10 @@ void mq_runtime_dispatch(struct mq_runtime *rt)
|
||||
DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp)
|
||||
{
|
||||
DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
|
||||
rt->priority_mq_len[mq_elt->header.priority]-=1;
|
||||
mq_dispatch_one_message(rt->schema, mq_elt);
|
||||
DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list
|
||||
rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1;
|
||||
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
|
||||
break;
|
||||
}
|
||||
@@ -202,7 +205,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms
|
||||
int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority)
|
||||
{
|
||||
if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
|
||||
if(rt->publish_enabled==false)return -1;
|
||||
if(rt->is_cleaning==true)return -1;
|
||||
if(priority < STELLAR_MQ_PRIORITY_LOW || priority > STELLAR_MQ_PRIORITY_HIGH)return -1;
|
||||
|
||||
unsigned int len = utarray_len(rt->schema->topic_array);
|
||||
@@ -213,6 +216,7 @@ int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id
|
||||
msg->header.priority = priority;
|
||||
msg->body = data;
|
||||
DL_APPEND(rt->priority_mq[priority], msg);
|
||||
rt->priority_mq_len[priority]+=1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -247,7 +251,7 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s)
|
||||
if(s==NULL)return NULL;
|
||||
struct mq_runtime *rt = CALLOC(struct mq_runtime,1);
|
||||
rt->schema=s;
|
||||
rt->publish_enabled=true;
|
||||
rt->is_cleaning=false;
|
||||
return rt;
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,8 @@ struct mq_runtime
|
||||
{
|
||||
struct mq_schema *schema;
|
||||
struct mq_message *priority_mq[STELLAR_MQ_MAX];// message list
|
||||
bool publish_enabled;
|
||||
size_t priority_mq_len[STELLAR_MQ_MAX];
|
||||
bool is_cleaning;
|
||||
};
|
||||
|
||||
int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority);
|
||||
|
||||
Reference in New Issue
Block a user