✨ feat(mq): add mq_runtime_defer, default disable
This commit is contained in:
@@ -42,6 +42,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms
|
|||||||
|
|
||||||
struct mq_runtime;
|
struct mq_runtime;
|
||||||
struct mq_runtime *mq_runtime_new(struct mq_schema *s);
|
struct mq_runtime *mq_runtime_new(struct mq_schema *s);
|
||||||
|
void mq_runtime_defer(struct mq_runtime *rt);
|
||||||
void mq_runtime_free(struct mq_runtime *s);
|
void mq_runtime_free(struct mq_runtime *s);
|
||||||
|
|
||||||
// return 0 if success, otherwise return -1
|
// return 0 if success, otherwise return -1
|
||||||
|
|||||||
@@ -270,6 +270,5 @@ void stellar_polling_dispatch(struct stellar_module_manager *mod_mgr)
|
|||||||
if(mod_mgr==NULL)return;
|
if(mod_mgr==NULL)return;
|
||||||
stellar_module_manager_polling_active(mod_mgr);
|
stellar_module_manager_polling_active(mod_mgr);
|
||||||
mq_runtime_dispatch(local_mq_rt);
|
mq_runtime_dispatch(local_mq_rt);
|
||||||
mq_runtime_clean(local_mq_rt);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -149,7 +149,7 @@ void mq_runtime_clean(struct mq_runtime *rt)
|
|||||||
struct mq_topic *topic;
|
struct mq_topic *topic;
|
||||||
rt->is_cleaning=true;
|
rt->is_cleaning=true;
|
||||||
|
|
||||||
for (int i = 0; i < MQ_TYPE_MAX; i++)
|
for (int i = 0; i < MQ_MAX; i++)
|
||||||
{
|
{
|
||||||
DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp)
|
DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp)
|
||||||
{
|
{
|
||||||
@@ -170,19 +170,27 @@ void mq_runtime_dispatch(struct mq_runtime *rt)
|
|||||||
{
|
{
|
||||||
struct mq_topic *topic=NULL;
|
struct mq_topic *topic=NULL;
|
||||||
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
|
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
|
||||||
while (rt->mq_len[MQ_TYPE_MAILBOX])
|
while (rt->mq_len[MQ_MAILBOX])
|
||||||
{
|
{
|
||||||
DL_FOREACH_SAFE(rt->mq[MQ_TYPE_MAILBOX], mq_elt, mq_tmp)
|
DL_FOREACH_SAFE(rt->mq[MQ_MAILBOX], mq_elt, mq_tmp)
|
||||||
{
|
{
|
||||||
DL_DELETE(rt->mq[MQ_TYPE_MAILBOX], mq_elt);
|
DL_DELETE(rt->mq[MQ_MAILBOX], mq_elt);
|
||||||
rt->mq_len[MQ_TYPE_MAILBOX] -= 1;
|
rt->mq_len[MQ_MAILBOX] -= 1;
|
||||||
topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
|
topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
|
||||||
mq_dispatch_one_message(topic, mq_elt);
|
mq_dispatch_one_message(topic, mq_elt);
|
||||||
DL_APPEND(rt->mq[MQ_TYPE_DEATH_LETTER], mq_elt); // move to dlq list
|
if (rt->defer_enabled==true)
|
||||||
rt->mq_len[MQ_TYPE_DEATH_LETTER] += 1;
|
{
|
||||||
|
DL_APPEND(rt->mq[MQ_DEATH_LETTER], mq_elt); // move to dlq list
|
||||||
|
rt->mq_len[MQ_DEATH_LETTER] += 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(topic->free_cb)topic->free_cb(mq_elt->body, topic->free_cb_arg);
|
||||||
|
FREE(mq_elt);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//mq_runtime_clean(rt);
|
mq_runtime_clean(rt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,8 +223,14 @@ int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data)
|
|||||||
msg->rt=rt;
|
msg->rt=rt;
|
||||||
msg->header.topic_id = topic_id;
|
msg->header.topic_id = topic_id;
|
||||||
msg->body = data;
|
msg->body = data;
|
||||||
DL_APPEND(rt->mq[MQ_TYPE_MAILBOX], msg);
|
DL_APPEND(rt->mq[MQ_MAILBOX], msg);
|
||||||
rt->mq_len[MQ_TYPE_MAILBOX]+=1;
|
rt->mq_len[MQ_MAILBOX]+=1;
|
||||||
|
|
||||||
|
if(rt->defer_enabled==false)
|
||||||
|
{
|
||||||
|
mq_runtime_dispatch(rt);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,6 +264,12 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s)
|
|||||||
return rt;
|
return rt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mq_runtime_defer(struct mq_runtime *rt)
|
||||||
|
{
|
||||||
|
if(rt==NULL)return;
|
||||||
|
rt->defer_enabled=true;
|
||||||
|
}
|
||||||
|
|
||||||
void mq_runtime_free(struct mq_runtime *rt)
|
void mq_runtime_free(struct mq_runtime *rt)
|
||||||
{
|
{
|
||||||
if(rt==NULL)return;
|
if(rt==NULL)return;
|
||||||
|
|||||||
@@ -54,17 +54,18 @@ struct mq_schema
|
|||||||
|
|
||||||
enum mq_property
|
enum mq_property
|
||||||
{
|
{
|
||||||
MQ_TYPE_MAILBOX = 0,
|
MQ_MAILBOX = 0,
|
||||||
MQ_TYPE_DEATH_LETTER = 1,
|
MQ_DEATH_LETTER = 1,
|
||||||
MQ_TYPE_MAX,
|
MQ_MAX,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct mq_runtime
|
struct mq_runtime
|
||||||
{
|
{
|
||||||
struct mq_schema *schema;
|
struct mq_schema *schema;
|
||||||
struct mq_message *mq[MQ_TYPE_MAX];// message queue
|
struct mq_message *mq[MQ_MAX];// message queue
|
||||||
size_t mq_len[MQ_TYPE_MAX];
|
size_t mq_len[MQ_MAX];
|
||||||
bool is_cleaning;
|
bool is_cleaning;
|
||||||
|
bool defer_enabled;
|
||||||
};
|
};
|
||||||
|
|
||||||
int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg);
|
int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg);
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(mq_runtime, pub_then_clean) {
|
TEST(mq_runtime, defer_pub_then_clean) {
|
||||||
|
|
||||||
struct test_pub_and_clean_env env={};
|
struct test_pub_and_clean_env env={};
|
||||||
env.s = mq_schema_new();
|
env.s = mq_schema_new();
|
||||||
@@ -174,6 +174,8 @@ TEST(mq_runtime, pub_then_clean) {
|
|||||||
env.rt=mq_runtime_new(env.s);
|
env.rt=mq_runtime_new(env.s);
|
||||||
EXPECT_TRUE(env.rt!=NULL);
|
EXPECT_TRUE(env.rt!=NULL);
|
||||||
|
|
||||||
|
mq_runtime_defer(env.rt);
|
||||||
|
|
||||||
for(int i=0; i<env.N_round;i++)
|
for(int i=0; i<env.N_round;i++)
|
||||||
{
|
{
|
||||||
env.current_round=i;
|
env.current_round=i;
|
||||||
@@ -370,6 +372,7 @@ TEST(mq_runtime, pub_on_msg_free)
|
|||||||
env.N_round=10;
|
env.N_round=10;
|
||||||
env.rt=mq_runtime_new(env.s);
|
env.rt=mq_runtime_new(env.s);
|
||||||
EXPECT_TRUE(env.rt!=NULL);
|
EXPECT_TRUE(env.rt!=NULL);
|
||||||
|
mq_runtime_defer(env.rt);
|
||||||
for(int i=0; i<env.N_round;i++)
|
for(int i=0; i<env.N_round;i++)
|
||||||
{
|
{
|
||||||
env.current_round=i;
|
env.current_round=i;
|
||||||
|
|||||||
Reference in New Issue
Block a user