✨ feat(add publish at once API): define in mq_internal.h temporarily
This commit is contained in:
@@ -106,25 +106,46 @@ int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
|
||||
}
|
||||
|
||||
|
||||
static void mq_dispatch_one_message(struct mq_schema *s, struct mq_message *mq_elt)
|
||||
static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt)
|
||||
{
|
||||
struct mq_subscriber *sub_elt, *sub_tmp;
|
||||
struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array,(unsigned int)(mq_elt->header.topic_id));
|
||||
if (topic)
|
||||
if(topic==NULL || mq_elt==NULL)return -1;
|
||||
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
|
||||
{
|
||||
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
|
||||
if (sub_elt->msg_cb)
|
||||
{
|
||||
if (sub_elt->msg_cb)
|
||||
{
|
||||
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,
|
||||
mq_elt->body,
|
||||
sub_elt->msg_cb,
|
||||
sub_elt->msg_cb_arg,
|
||||
topic->dispatch_cb_arg);
|
||||
else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
|
||||
}
|
||||
if (topic->dispatch_cb)
|
||||
topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg,
|
||||
topic->dispatch_cb_arg);
|
||||
else
|
||||
sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg)
|
||||
{
|
||||
if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
|
||||
//if(rt->is_cleaning==true)return -1;
|
||||
unsigned int len = utarray_len(rt->schema->topic_array);
|
||||
if (len <= (unsigned int)topic_id)return -1;
|
||||
|
||||
struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(topic_id));
|
||||
if(topic==NULL)return -1;
|
||||
|
||||
struct mq_message *mq_elt = CALLOC(struct mq_message,1);
|
||||
mq_elt->rt=rt;
|
||||
mq_elt->header.topic_id = topic_id;
|
||||
mq_elt->header.priority = STELLAR_MQ_PRIORITY_HIGH;
|
||||
mq_elt->body = msg;
|
||||
mq_dispatch_one_message(topic, mq_elt);
|
||||
if (topic->free_cb)
|
||||
{
|
||||
topic->free_cb(mq_elt->body, topic->free_cb_arg);
|
||||
}
|
||||
FREE(mq_elt);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mq_runtime_clean(struct mq_runtime *rt)
|
||||
@@ -155,6 +176,7 @@ void mq_runtime_clean(struct mq_runtime *rt)
|
||||
|
||||
void mq_runtime_dispatch(struct mq_runtime *rt)
|
||||
{
|
||||
struct mq_topic *topic=NULL;
|
||||
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
|
||||
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
|
||||
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
|
||||
@@ -168,7 +190,8 @@ void mq_runtime_dispatch(struct mq_runtime *rt)
|
||||
{
|
||||
DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
|
||||
rt->priority_mq_len[mq_elt->header.priority]-=1;
|
||||
mq_dispatch_one_message(rt->schema, mq_elt);
|
||||
topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(mq_elt->header.topic_id));
|
||||
mq_dispatch_one_message(topic, mq_elt);
|
||||
DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list
|
||||
rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1;
|
||||
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
|
||||
|
||||
@@ -70,7 +70,8 @@ struct mq_runtime
|
||||
bool is_cleaning;
|
||||
};
|
||||
|
||||
int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority);
|
||||
int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *msg, enum mq_property priority);
|
||||
int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user