#include #include #include #include #include #include "dns_decoder_perf_dummy.h" #define MAX_SUBSCRIBE_TOPIC_NUM 16 #define MAX_MESSGEA_TOPIC_NUM 16 #define MAX_STELLAR_PLUGIN_NUM 16 struct stellar_plugin { void *plugin_env; void *per_session_ctx; session_ctx_new_func *session_ctx_new; session_ctx_free_func *session_ctx_free; }; struct message_topic { char *name; void *msg_free_arg; int on_msg_cb_idx; session_msg_free_cb_func *msg_free_cb; int on_msg_plugin_id[MAX_SUBSCRIBE_TOPIC_NUM]; on_session_msg_cb_func *plugin_on_msg_cb[MAX_SUBSCRIBE_TOPIC_NUM]; }; struct stellar { int topic_idx; int plugin_idx; struct message_topic topic[MAX_MESSGEA_TOPIC_NUM]; struct stellar_plugin plugin[MAX_STELLAR_PLUGIN_NUM]; }; struct session { int tid; int topic_idx; int plugin_idx; struct message_topic *topic; struct stellar_plugin *plugin; struct stellar_packet *curr_msg; }; const char *session_get0_current_payload(struct session *ss, size_t *payload_sz) { if(ss->curr_msg==NULL) { *payload_sz=0; return NULL; } *payload_sz=ss->curr_msg->payload_sz; return (const char *)ss->curr_msg->payload; } int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { int topic_id=st->topic_idx++; st->topic[topic_id].name=strdup(topic_name); st->topic[topic_id].msg_free_cb=msg_free_cb; st->topic[topic_id].msg_free_arg=msg_free_arg; return topic_id; } int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) { for(int i=0; itopic_idx; i++) { if(strcmp(st->topic[i].name, topic_name)==0) { return i; } } return -1; } int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id) { st->topic[topic_id].on_msg_plugin_id[st->topic[topic_id].on_msg_cb_idx]=plugin_id; st->topic[topic_id].plugin_on_msg_cb[st->topic[topic_id].on_msg_cb_idx]=plugin_on_msg_cb; st->topic[topic_id].on_msg_cb_idx++; return 0; } int session_mq_publish_message(struct session *ss, int topic_id, void *msg) { for(int i=0; itopic[topic_id].on_msg_cb_idx; i++) { int plugin_id=ss->topic[topic_id].on_msg_plugin_id[i]; ss->topic[topic_id].plugin_on_msg_cb[i](ss, topic_id, msg, ss->plugin[plugin_id].per_session_ctx, ss->plugin[plugin_id].plugin_env); } return 0; } int session_mq_publish_message_by_name(struct session *ss, const char *topic_name, struct stellar_packet *msg) { for(int i=0; itopic_idx; i++) { if(strcmp(ss->topic[i].name, topic_name)==0) { ss->curr_msg=msg; session_mq_publish_message(ss, i, (void *)msg); break; } } return 0; } int stellar_session_plugin_register(struct stellar *st, session_ctx_new_func session_ctx_new, session_ctx_free_func session_ctx_free, void *plugin_env) { int plugin_id=st->plugin_idx++; st->plugin[plugin_id].plugin_env=plugin_env; st->plugin[plugin_id].session_ctx_new=session_ctx_new; st->plugin[plugin_id].session_ctx_free=session_ctx_free; return plugin_id; } struct session *stellar_session_new(struct stellar *st, int tid) { struct session *ss=(struct session *)malloc(sizeof(struct session)); ss->tid=tid; ss->plugin=st->plugin; ss->plugin_idx=st->plugin_idx; ss->topic=st->topic; ss->topic_idx=st->topic_idx; for(int i=0; iplugin_idx; i++) { st->plugin[i].per_session_ctx=st->plugin[i].session_ctx_new(ss, st->plugin[i].plugin_env); } return ss; } void stellar_session_free(struct session *ss) { for(int i=0; iplugin_idx; i++) { ss->plugin[i].session_ctx_free(ss, ss->plugin[i].per_session_ctx, ss->plugin[i].plugin_env); } free(ss); } struct stellar *stellar_init(int worker_thread_num) { struct stellar *st=(struct stellar *)malloc(sizeof(struct stellar)); st->topic_idx=0; st->plugin_idx=0; return st; }