#include "mq_internal.h" #include "stellar/utils.h" #include "uthash/utlist.h" /******************************* * STELLAR MQ * *******************************/ static void mq_topic_schema_copy(void *_dst, const void *_src) { struct mq_topic *dst = (struct mq_topic *)_dst, *src = (struct mq_topic *)_src; memcpy(_dst, _src, sizeof(struct mq_topic)); dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; } static void mq_topic_schema_dtor(void *_elt) { struct mq_topic *elt = (struct mq_topic *)_elt; if (elt->topic_name) FREE(elt->topic_name); // FREE(elt); // free the item } UT_icd mq_topic_schema_icd = {sizeof(struct mq_topic), NULL, mq_topic_schema_copy, mq_topic_schema_dtor}; int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name) { if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1; unsigned int len = utarray_len(s->topic_array); struct mq_topic *topic; for(unsigned int i = 0; i < len; i++) { topic = (struct mq_topic *)utarray_eltptr(s->topic_array, i); if(strcmp(topic->topic_name, topic_name) == 0) { return i; } } return -1; } static struct mq_topic *mq_schema_get_topic(struct mq_schema *s, int topic_id) { if(s==NULL || s->topic_array == NULL || topic_id < 0)return NULL; unsigned int len = utarray_len(s->topic_array); if (len <= (unsigned int)topic_id)return NULL; return (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); } int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct mq_topic *topic = mq_schema_get_topic(s, topic_id); if(topic == NULL)return -1; topic->dispatch_cb=on_dispatch_cb; topic->dispatch_cb_arg=on_dispatch_arg; topic->free_cb=msg_free_cb; topic->free_cb_arg=msg_free_arg; return 0; } int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { if(s==NULL)return -1; if(s->topic_array == NULL) { utarray_new(s->topic_array, &mq_topic_schema_icd); } unsigned int len = utarray_len(s->topic_array); if(mq_schema_get_topic_id(s, topic_name) >= 0) { return -1; } struct mq_topic topic={}; topic.dispatch_cb=on_dispatch_cb; topic.free_cb=msg_free_cb; topic.topic_name=(char *)topic_name; topic.topic_id=len;//topid_id equals arrary index topic.dispatch_cb_arg=on_dispatch_arg; topic.free_cb_arg=msg_free_arg; topic.subscribers=NULL; topic.subscriber_cnt=0; utarray_push_back(s->topic_array, &topic); s->mq_topic_num+=1; return topic.topic_id; } int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) { struct mq_topic *topic = mq_schema_get_topic(s, topic_id); if(topic == NULL)return -1; if (topic->is_destroyed == 1)return 0; struct mq_subscriber *sub_elt, *sub_tmp; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { DL_DELETE(topic->subscribers, sub_elt); FREE(sub_elt); } topic->is_destroyed = 1; s->mq_topic_num-=1; return 1; // success } static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt) { struct mq_subscriber *sub_elt, *sub_tmp; if(topic==NULL || mq_elt==NULL)return -1; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { if (sub_elt->msg_cb) { if (topic->dispatch_cb) topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg, topic->dispatch_cb_arg); else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg); } } return 0; } int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg) { if(rt==NULL || rt->schema == NULL)return -1; struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); if(topic==NULL)return -1; struct mq_message mq_elt; mq_elt.rt=rt; mq_elt.header.topic_id = topic_id; mq_elt.body = msg; mq_dispatch_one_message(topic, &mq_elt); if (topic->free_cb) { topic->free_cb(mq_elt.body, topic->free_cb_arg); } return 0; } void mq_runtime_clean(struct mq_runtime *rt) { if(rt==NULL)return; struct mq_message *mq_elt, *tmp; struct mq_topic *topic; rt->is_cleaning=true; for (int i = 0; i < MQ_TYPE_MAX; i++) { DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp) { topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); if (topic && topic->free_cb) { topic->free_cb(mq_elt->body, topic->free_cb_arg); } DL_DELETE(rt->mq[i], mq_elt); rt->mq_len[i] -= 1; FREE(mq_elt); } } rt->is_cleaning=false; } void mq_runtime_dispatch(struct mq_runtime *rt) { struct mq_topic *topic=NULL; struct mq_message *mq_elt=NULL, *mq_tmp=NULL; while (rt->mq_len[MQ_TYPE_MAILBOX]) { DL_FOREACH_SAFE(rt->mq[MQ_TYPE_MAILBOX], mq_elt, mq_tmp) { DL_DELETE(rt->mq[MQ_TYPE_MAILBOX], mq_elt); rt->mq_len[MQ_TYPE_MAILBOX] -= 1; topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); mq_dispatch_one_message(topic, mq_elt); DL_APPEND(rt->mq[MQ_TYPE_DEATH_LETTER], mq_elt); // move to dlq list rt->mq_len[MQ_TYPE_DEATH_LETTER] += 1; } } //mq_runtime_clean(rt); return; } //return 0 if success, otherwise return -1. int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg) { struct mq_topic *topic = mq_schema_get_topic(s, topic_id); if(topic==NULL)return -1; struct mq_subscriber *new_subscriber = CALLOC(struct mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; new_subscriber->msg_cb = on_msg_cb; new_subscriber->msg_cb_arg = on_msg_cb_arg; DL_APPEND(topic->subscribers, new_subscriber); topic->subscriber_cnt+=1; s->mq_topic_subscriber_num+=1; return 0; } int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) { if(rt==NULL || rt->schema == NULL)return -1; if(rt->is_cleaning==true)return -1; struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); if(topic==NULL)return -1; struct mq_message *msg= CALLOC(struct mq_message,1); msg->rt=rt; msg->header.topic_id = topic_id; msg->body = data; DL_APPEND(rt->mq[MQ_TYPE_MAILBOX], msg); rt->mq_len[MQ_TYPE_MAILBOX]+=1; return 0; } struct mq_schema *mq_schema_new() { struct mq_schema *s = CALLOC(struct mq_schema,1); return s; } void mq_schema_free(struct mq_schema *s) { if(s==NULL)return; if(s->topic_array) { for (unsigned int i = 0; i < utarray_len(s->topic_array); i++) { mq_schema_destroy_topic(s, i); } utarray_free(s->topic_array); } FREE(s); return; } struct mq_runtime *mq_runtime_new(struct mq_schema *s) { if(s==NULL)return NULL; struct mq_runtime *rt = CALLOC(struct mq_runtime,1); rt->schema=s; rt->is_cleaning=false; return rt; } void mq_runtime_free(struct mq_runtime *rt) { if(rt==NULL)return; mq_runtime_clean(rt); FREE(rt); }