This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/mq/mq.c

261 lines
7.6 KiB
C

#include "mq_internal.h"
#include "stellar/utils.h"
#include "uthash/utlist.h"
/*******************************
* STELLAR MQ *
*******************************/
static void mq_topic_schema_copy(void *_dst, const void *_src)
{
struct mq_topic *dst = (struct mq_topic *)_dst,
*src = (struct mq_topic *)_src;
memcpy(_dst, _src, sizeof(struct mq_topic));
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
static void mq_topic_schema_dtor(void *_elt)
{
struct mq_topic *elt = (struct mq_topic *)_elt;
if (elt->topic_name)
FREE(elt->topic_name);
// FREE(elt); // free the item
}
UT_icd mq_topic_schema_icd = {sizeof(struct mq_topic), NULL, mq_topic_schema_copy, mq_topic_schema_dtor};
int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name)
{
if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1;
unsigned int len = utarray_len(s->topic_array);
struct mq_topic *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct mq_topic *)utarray_eltptr(s->topic_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
}
}
return -1;
}
int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
if(s == NULL || s->topic_array == NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if(len < (unsigned int)topic_id)return -1;
struct mq_topic *t_schema = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->dispatch_cb=on_dispatch_cb;
t_schema->dispatch_cb_arg=on_dispatch_arg;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
if(s==NULL)return -1;
if(s->topic_array == NULL)
{
utarray_new(s->topic_array, &mq_topic_schema_icd);
}
unsigned int len = utarray_len(s->topic_array);
if(mq_schema_get_topic_id(s, topic_name) >= 0)
{
return -1;
}
struct mq_topic t_schema;
memset(&t_schema, 0, sizeof(struct mq_topic));
t_schema.dispatch_cb=on_dispatch_cb;
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.dispatch_cb_arg=on_dispatch_arg;
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
utarray_push_back(s->topic_array, &t_schema);
s->mq_topic_num+=1;
return t_schema.topic_id;
}
int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
{
if(s==NULL)return -1;
if(s->topic_array==NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if (len <= (unsigned int)topic_id)
return -1;
struct mq_topic *topic =
(struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
struct mq_subscriber *sub_elt, *sub_tmp;
if(topic == NULL)return -1;
if (topic->is_destroyed == 1)return 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
DL_DELETE(topic->subscribers, sub_elt);
FREE(sub_elt);
}
topic->is_destroyed = 1;
s->mq_topic_num-=1;
return 1; // success
}
static void mq_dispatch_one_message(struct mq_schema *s, struct mq_message *mq_elt)
{
struct mq_subscriber *sub_elt, *sub_tmp;
struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array,(unsigned int)(mq_elt->header.topic_id));
if (topic)
{
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
if (sub_elt->msg_cb)
{
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,
mq_elt->body,
sub_elt->msg_cb,
sub_elt->msg_cb_arg,
topic->dispatch_cb_arg);
else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
}
}
}
}
void mq_runtime_clean(struct mq_runtime *rt)
{
if(rt==NULL)return;
struct mq_message *mq_elt, *tmp;
struct mq_topic *topic;
rt->publish_enabled=false;
for (unsigned long i = 0; i < count_of(rt->priority_mq); i++)
{
DL_FOREACH_SAFE(rt->priority_mq[i], mq_elt, tmp)
{
topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
topic->free_cb(mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(rt->priority_mq[i], mq_elt);
FREE(mq_elt);
}
}
rt->publish_enabled=true;
}
void mq_runtime_dispatch(struct mq_runtime *rt)
{
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
{
if(rt->priority_mq[cur_priority]==NULL)
{
cur_priority--;
continue;
}
DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp)
{
DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
mq_dispatch_one_message(rt->schema, mq_elt);
DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
break;
}
}
//mq_runtime_clean(rt);
return;
}
//return 0 if success, otherwise return -1.
int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
{
if(s == NULL || s->topic_array == NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if (len <= (unsigned int)topic_id)return -1;
struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct mq_subscriber *new_subscriber = CALLOC(struct mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->msg_cb = on_msg_cb;
new_subscriber->msg_cb_arg = on_msg_cb_arg;
DL_APPEND(topic->subscribers, new_subscriber);
topic->subscriber_cnt+=1;
s->mq_topic_subscriber_num+=1;
return 0;
}
int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority)
{
if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
if(rt->publish_enabled==false)return -1;
if(priority < STELLAR_MQ_PRIORITY_LOW || priority > STELLAR_MQ_PRIORITY_HIGH)return -1;
unsigned int len = utarray_len(rt->schema->topic_array);
if (len <= (unsigned int)topic_id)return -1;
struct mq_message *msg= CALLOC(struct mq_message,1);
msg->rt=rt;
msg->header.topic_id = topic_id;
msg->header.priority = priority;
msg->body = data;
DL_APPEND(rt->priority_mq[priority], msg);
return 0;
}
int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data)
{
return mq_runtime_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM);
}
struct mq_schema *mq_schema_new()
{
struct mq_schema *s = CALLOC(struct mq_schema,1);
return s;
}
void mq_schema_free(struct mq_schema *s)
{
if(s==NULL)return;
if(s->topic_array)
{
for (unsigned int i = 0; i < utarray_len(s->topic_array); i++)
{
mq_schema_destroy_topic(s, i);
}
utarray_free(s->topic_array);
}
FREE(s);
return;
}
struct mq_runtime *mq_runtime_new(struct mq_schema *s)
{
if(s==NULL)return NULL;
struct mq_runtime *rt = CALLOC(struct mq_runtime,1);
rt->schema=s;
rt->publish_enabled=true;
return rt;
}
void mq_runtime_free(struct mq_runtime *rt)
{
if(rt==NULL)return;
mq_runtime_clean(rt);
FREE(rt);
}