From e9825c3988575be0bc6b422568e81335eb3cb5ce Mon Sep 17 00:00:00 2001 From: yangwei Date: Tue, 10 Sep 2024 10:18:05 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A6=84=20refactor(stellar=20api):=20split?= =?UTF-8?q?=20exdata=20and=20mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {infra/exdata => include/stellar}/exdata.h | 17 +- include/stellar/mq.h | 52 ++++ include/stellar/stellar.h | 30 +- include/stellar/stellar_exdata.h | 34 --- include/stellar/stellar_mq.h | 45 --- infra/CMakeLists.txt | 2 +- infra/exdata/exdata.c | 10 +- infra/exdata/exdata_internal.h | 2 +- infra/mq/CMakeLists.txt | 3 + infra/mq/mq.c | 212 +++++++++++++ infra/mq/mq_interna.h | 74 +++++ infra/plugin_manager/plugin_manager.c | 282 ++---------------- infra/plugin_manager/plugin_manager.h | 17 +- infra/plugin_manager/plugin_manager_interna.h | 99 +----- infra/stellar_core.c | 20 +- 15 files changed, 423 insertions(+), 476 deletions(-) rename {infra/exdata => include/stellar}/exdata.h (51%) create mode 100644 include/stellar/mq.h delete mode 100644 include/stellar/stellar_exdata.h delete mode 100644 include/stellar/stellar_mq.h create mode 100644 infra/mq/CMakeLists.txt create mode 100644 infra/mq/mq.c create mode 100644 infra/mq/mq_interna.h diff --git a/infra/exdata/exdata.h b/include/stellar/exdata.h similarity index 51% rename from infra/exdata/exdata.h rename to include/stellar/exdata.h index 941128a..8233c8b 100644 --- a/infra/exdata/exdata.h +++ b/include/stellar/exdata.h @@ -1,5 +1,10 @@ #pragma once +#ifdef __cplusplus +extern "C" +{ +#endif + typedef void exdata_free(int idx, void *ex_ptr, void *arg); struct exdata_schema; @@ -7,16 +12,20 @@ struct exdata_schema; struct exdata_schema *exdata_schema_new(); void exdata_schema_free(struct exdata_schema *s); -int exdata_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg); +int exdata_schema_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg); int exdata_schema_get_idx_by_name(struct exdata_schema *schema, const char *name); struct exdata_runtime; -struct exdata_runtime *exdata_handle_new(struct exdata_schema *h); -void exdata_handle_free(struct exdata_runtime *h); -void exdata_handle_reset(struct exdata_runtime *h); +struct exdata_runtime *exdata_runtime_new(struct exdata_schema *h); +void exdata_runtime_free(struct exdata_runtime *h); +void exdata_runtime_reset(struct exdata_runtime *h);//call free_func, and set ex_ptr to NULL int exdata_set(struct exdata_runtime *h, int idx, void *ex_ptr); void *exdata_get(struct exdata_runtime *h, int idx); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/include/stellar/mq.h b/include/stellar/mq.h new file mode 100644 index 0000000..57e860a --- /dev/null +++ b/include/stellar/mq.h @@ -0,0 +1,52 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +struct mq_schema; +struct mq_schema *mq_schema_new(); +void mq_schema_free(struct mq_schema *s); + +struct mq_runtime; +struct mq_runtime *mq_runtime_new(struct mq_schema *s); +void mq_runtime_free(struct mq_runtime *s); + +typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg); +typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg); +typedef void on_msg_dispatch_cb_func(int topic_id, + const void *msg, + on_msg_cb_func* on_msg_cb, + void *on_msg_cb_arg, + void *dispatch_arg); + +//return topic_id +int mq_schema_create_topic(struct mq_schema *s, + const char *topic_name, + on_msg_dispatch_cb_func *on_dispatch_cb, + void *on_dispatch_arg, + mq_msg_free_cb_func *msg_free_cb, + void *msg_free_arg); + +int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name); + +int mq_schema_update_topic(struct mq_schema *s, + int topic_id, + on_msg_dispatch_cb_func *on_dispatch_cb, + void *on_dispatch_arg, + mq_msg_free_cb_func *msg_free_cb, + void *msg_free_arg); + +int mq_schema_destroy_topic(struct mq_schema *s, int topic_id); + +//return 0 if success, otherwise return -1. +int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg); + + +int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg); +void mq_runtime_dispatch(struct mq_runtime *rt); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 63f7ab3..cc66cfe 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -7,32 +7,24 @@ extern "C" #include -struct stellar; +#include "stellar/mq.h" +#include "stellar/log.h" +#include "stellar/packet.h" -/********************************************** - * PLUGIN API * - **********************************************/ +struct stellar; //return plugin_env typedef void *plugin_on_load_func(struct stellar *st); typedef void plugin_on_unload_func(void *plugin_env); -/********************************************** - * PLUGIN EVENT API * - **********************************************/ -struct packet; typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg); +//return 0 if success, otherwise return -1. +int stellar_raw_packet_subscribe(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg); -int stellar_on_raw_packet_register(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg); - -//return polling work result, 0: no work, 1: work +//return on_polling state, 0: idle, 1: working typedef int plugin_on_polling_func(void *polling_arg); -//return polling plugin_id -int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg); - -/********************************************** - * STELLAR DEV API * - **********************************************/ +//return 0 if success, otherwise return -1. +int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg); void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); @@ -41,7 +33,6 @@ uint16_t stellar_get_current_thread_index(); // only send user build packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt); -struct stellar; struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file); void stellar_run(struct stellar *st); void stellar_free(struct stellar *st); @@ -49,6 +40,9 @@ void stellar_loopbreak(struct stellar *st); void stellar_reload_log_level(struct stellar *st); struct logger *stellar_get_logger(struct stellar *st); +struct mq_schema *stellar_get_mq_schema(struct stellar *st); +struct mq_runtime *stellar_get_mq_runtime(struct stellar *st); + #ifdef __cplusplus } #endif diff --git a/include/stellar/stellar_exdata.h b/include/stellar/stellar_exdata.h deleted file mode 100644 index 8ed67bb..0000000 --- a/include/stellar/stellar_exdata.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include "utils.h" -#include "stellar.h" - -#ifdef __cplusplus -extern "C" -{ -#endif - -typedef void stellar_exdata_free(int idx, void *ex_ptr, void *arg); - - -inline static void stellar_exdata_free_default(int idx __unused, void *ex_ptr, void *arg __unused) -{ - if(ex_ptr)FREE(ex_ptr); -} - -struct packet; -int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg); - -//packet exdata api -int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr); -void *packet_exdata_get(struct packet *pkt, int idx); - -struct session; - -//session exdata api -int session_exdata_set(struct session *sess, int idx, void *ex_ptr); -void *session_exdata_get(struct session *sess, int idx); - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h deleted file mode 100644 index 3d27c9f..0000000 --- a/include/stellar/stellar_mq.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once - -#include "utils.h" -#include "stellar.h" - -#ifdef __cplusplus -extern "C" -{ -#endif - -//topic api -typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg); - -inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unused) -{ - if(msg)FREE(msg); -} - -typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg); -typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *on_msg_cb_arg, void *dispatch_arg); - -//return topic_id -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); -int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name); -int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); -int stellar_mq_destroy_topic(struct stellar *st, int topic_id); - - -enum stellar_mq_priority -{ - STELLAR_MQ_PRIORITY_LOW, - STELLAR_MQ_PRIORITY_NORMAL, - STELLAR_MQ_PRIORITY_HIGH, - STELLAR_MQ_PRIORITY_MAX, -}; - - -//return 0 if success, otherwise return -1. -int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg); -int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg); -int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority); - -#ifdef __cplusplus -} -#endif \ No newline at end of file diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index cc4974e..aec3d43 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,4 +1,4 @@ -set(INFRA exdata tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager) +set(INFRA exdata mq tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml) #set(DECODERS http lpi) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) diff --git a/infra/exdata/exdata.c b/infra/exdata/exdata.c index e1b75df..e183c82 100644 --- a/infra/exdata/exdata.c +++ b/infra/exdata/exdata.c @@ -57,7 +57,7 @@ static void stellar_exdata_met_dtor(void *_elt) UT_icd stellar_exdata_meta_icd = {sizeof(struct exdata_meta), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor}; -int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg) +int exdata_schema_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg) { if(s==NULL || name==NULL)return -1; if(s->exdata_meta_array == NULL) @@ -90,7 +90,7 @@ int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *fre /******************************* * STELLAR EXDATA HANDLE API * *******************************/ -struct exdata_runtime *exdata_handle_new(struct exdata_schema *s) +struct exdata_runtime *exdata_runtime_new(struct exdata_schema *s) { if(s==NULL || s->exdata_meta_array==NULL)return NULL; struct exdata_runtime *h = CALLOC(struct exdata_runtime, 1); @@ -103,7 +103,7 @@ struct exdata_runtime *exdata_handle_new(struct exdata_schema *s) return h; } -void exdata_handle_reset(struct exdata_runtime *h) +void exdata_runtime_reset(struct exdata_runtime *h) { if(h==NULL||h->schema==NULL||h->exdata_array==NULL)return; unsigned int len=utarray_len(h->schema->exdata_meta_array); @@ -125,10 +125,10 @@ void exdata_handle_reset(struct exdata_runtime *h) return; } -void exdata_handle_free(struct exdata_runtime *h) +void exdata_runtime_free(struct exdata_runtime *h) { if(h==NULL)return; - exdata_handle_reset(h); + exdata_runtime_reset(h); if(h->exdata_array)FREE(h->exdata_array); FREE(h); } diff --git a/infra/exdata/exdata_internal.h b/infra/exdata/exdata_internal.h index 4b8e398..a4b7a4f 100644 --- a/infra/exdata/exdata_internal.h +++ b/infra/exdata/exdata_internal.h @@ -2,7 +2,7 @@ #include "uthash/utarray.h" -#include "exdata.h" +#include "stellar/exdata.h" struct exdata_schema { diff --git a/infra/mq/CMakeLists.txt b/infra/mq/CMakeLists.txt new file mode 100644 index 0000000..727073a --- /dev/null +++ b/infra/mq/CMakeLists.txt @@ -0,0 +1,3 @@ +add_library(mq mq.c) + +#add_subdirectory(test) \ No newline at end of file diff --git a/infra/mq/mq.c b/infra/mq/mq.c new file mode 100644 index 0000000..9f49eb1 --- /dev/null +++ b/infra/mq/mq.c @@ -0,0 +1,212 @@ +#include "mq_interna.h" + +#include "stellar/utils.h" + +#include "uthash/utlist.h" + +/******************************* + * STELLAR MQ * + *******************************/ +static void stellar_mq_topic_schema_copy(void *_dst, const void *_src) +{ + struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst, + *src = (struct stellar_mq_topic_schema *)_src; + memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema)); + dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; +} + +static void stellar_mq_topic_schema_dtor(void *_elt) +{ + struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt; + if (elt->topic_name) + FREE(elt->topic_name); + // FREE(elt); // free the item +} + +UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; + +int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name) +{ + if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1; + unsigned int len = utarray_len(s->topic_array); + struct stellar_mq_topic_schema *t_schema; + for(unsigned int i = 0; i < len; i++) + { + t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, i); + if(strcmp(t_schema->topic_name, topic_name) == 0) + { + return i; + } + } + return -1; +} + +int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + if(s == NULL || s->topic_array == NULL)return -1; + unsigned int len = utarray_len(s->topic_array); + if(len < (unsigned int)topic_id)return -1; + struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + if(t_schema == NULL)return -1; + t_schema->dispatch_cb=on_dispatch_cb; + t_schema->dispatch_cb_arg=on_dispatch_arg; + t_schema->free_cb=msg_free_cb; + t_schema->free_cb_arg=msg_free_arg; + return 0; +} + +int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + if(s==NULL)return -1; + if(s->topic_array == NULL) + { + utarray_new(s->topic_array, &stellar_mq_topic_schema_icd); + } + unsigned int len = utarray_len(s->topic_array); + if(mq_schema_get_topic_id(s, topic_name) >= 0) + { + return -1; + } + struct stellar_mq_topic_schema t_schema; + memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema)); + t_schema.dispatch_cb=on_dispatch_cb; + t_schema.free_cb=msg_free_cb; + t_schema.topic_name=(char *)topic_name; + t_schema.topic_id=len;//topid_id equals arrary index + t_schema.dispatch_cb_arg=on_dispatch_arg; + t_schema.free_cb_arg=msg_free_arg; + t_schema.subscribers=NULL; + t_schema.subscriber_cnt=0; + utarray_push_back(s->topic_array, &t_schema); + s->stellar_mq_topic_num+=1; + return t_schema.topic_id; +} + +int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) +{ + if(s==NULL)return -1; + if(s->topic_array==NULL)return -1; + unsigned int len = utarray_len(s->topic_array); + if (len <= (unsigned int)topic_id) + return -1; + struct stellar_mq_topic_schema *topic = + (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + + if(topic == NULL)return -1; + + if (topic->is_destroyed == 1)return 0; + + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + DL_DELETE(topic->subscribers, sub_elt); + FREE(sub_elt); + } + topic->is_destroyed = 1; + s->stellar_mq_topic_num-=1; + return 1; // success +} + + +static void stellar_mq_dispatch_one_message(struct mq_schema *s, struct stellar_message *mq_elt) +{ + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, + (unsigned int)(mq_elt->header.topic_id)); + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + if (sub_elt->plugin_msg_cb) + { + if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, sub_elt->plugin_msg_cb_arg); + else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->plugin_msg_cb_arg); + } + } + } +} + +static void mq_runtime_clean_dlq(struct mq_runtime *rt) +{ + struct stellar_message *mq_elt, *tmp; + struct stellar_mq_topic_schema *topic; + DL_FOREACH_SAFE(rt->dealth_letter_queue, mq_elt, tmp) + { + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(rt->schema->topic_array, + (unsigned int)(mq_elt->header.topic_id)); + if (topic && topic->free_cb) + { + topic->free_cb(mq_elt->body, topic->free_cb_arg); + } + DL_DELETE(rt->dealth_letter_queue, mq_elt); + FREE(mq_elt); + } +} + +void stellar_mq_dispatch(struct mq_runtime *rt) +{ + struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; + int cur_priority = STELLAR_MQ_PRIORITY_HIGH; + while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) + { + if(rt->priority_mq[cur_priority]==NULL) + { + cur_priority--; + continue; + } + DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp) + { + stellar_mq_dispatch_one_message(rt->schema, mq_elt); + DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt); + DL_APPEND(rt->dealth_letter_queue, mq_elt); // move to dlq list + + cur_priority=STELLAR_MQ_PRIORITY_HIGH; + break; + } + } + mq_runtime_clean_dlq(rt); + return; +} + +//return 0 if success, otherwise return -1. +int stellar_mq_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg) +{ + + if(s == NULL || s->topic_array == NULL)return -1; + + unsigned int len = utarray_len(s->topic_array); + if (len <= (unsigned int)topic_id)return -1; + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->plugin_msg_cb = on_msg_cb; + new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg; + DL_APPEND(topic->subscribers, new_subscriber); + + topic->subscriber_cnt+=1; + s->mq_topic_subscriber_num+=1; + return 0; +} + +int stellar_mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum stellar_mq_priority priority) +{ + if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; + + unsigned int len = utarray_len(rt->schema->topic_array); + if (len <= (unsigned int)topic_id)return -1; + struct stellar_message *msg= CALLOC(struct stellar_message,1); + msg->rt=rt; + msg->header.topic_id = topic_id; + msg->header.priority = priority; + msg->body = data; + DL_APPEND(rt->priority_mq[priority], msg); + return 0; +} + +int stellar_mq_publish_message(struct mq_runtime *rt, int topic_id, void *data) +{ + return stellar_mq_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM); +} \ No newline at end of file diff --git a/infra/mq/mq_interna.h b/infra/mq/mq_interna.h new file mode 100644 index 0000000..1a46d3e --- /dev/null +++ b/infra/mq/mq_interna.h @@ -0,0 +1,74 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar/mq.h" + +#include "uthash/utarray.h" + +struct mq_schema +{ + UT_array *topic_array; + int stellar_mq_topic_num; + int mq_topic_subscriber_num; +}; + +enum stellar_mq_priority +{ + STELLAR_MQ_PRIORITY_LOW = 0, + STELLAR_MQ_PRIORITY_MEDIUM, + STELLAR_MQ_PRIORITY_HIGH, + STELLAR_MQ_PRIORITY_MAX +}; + +struct stellar_message +{ + struct mq_runtime *rt; + struct + { + int topic_id; + enum stellar_mq_priority priority; + } header; + void *body; + struct stellar_message *next, *prev; +} __attribute__((aligned(sizeof(void *)))); + +typedef struct stellar_mq_subscriber +{ + int topic_subscriber_idx; + int plugin_idx; + on_msg_cb_func *plugin_msg_cb; + void *plugin_msg_cb_arg; + struct stellar_mq_subscriber *next, *prev; +}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); + + +struct stellar_mq_topic_schema +{ + char *topic_name; + int topic_id; + int subscriber_cnt; + int is_destroyed; + on_msg_dispatch_cb_func *dispatch_cb; + void *dispatch_cb_arg; + mq_msg_free_cb_func *free_cb; + void *free_cb_arg; + struct stellar_mq_subscriber *subscribers; +}__attribute__((aligned(sizeof(void*)))); + + + +struct mq_runtime +{ + struct mq_schema *schema; + struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list + struct stellar_message *dealth_letter_queue;// dlq list +}; + + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index 0c50319..f9773a5 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -1,16 +1,12 @@ #include "plugin_manager_interna.h" -#include "stellar/stellar_exdata.h" #include "stellar/utils.h" #include "toml/toml.h" -#include "uthash/utlist.h" - +#include #include #include "stellar_core.h" -#include "packet_private.h" -#include "session_private.h" - +#if 0 void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment) { plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment; @@ -26,6 +22,7 @@ bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *pl if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true; return false; } +#endif UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; @@ -87,6 +84,7 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } +#if 0 static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) { if(st == NULL)return NULL; @@ -94,7 +92,6 @@ static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num); return per_thread_data; } - static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st) { if(per_thread_data == NULL || st == NULL)return; @@ -108,8 +105,9 @@ static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread FREE(per_thread_data); return; } +#endif -struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage) +struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { int spec_num; struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num); @@ -118,7 +116,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char return NULL; } struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); - plug_mgr->max_message_dispatch=max_msg_per_stage; + //plug_mgr->max_message_dispatch=max_msg_per_stage; if(spec_num > 0) { utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); @@ -128,7 +126,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char plug_mgr->st = st; stellar_set_plugin_manger(st, plug_mgr); - plug_mgr->exdata_schema=exdata_schema_new(); + //plug_mgr->exdata_schema=exdata_schema_new(); for(int i = 0; i < spec_num; i++) { @@ -139,7 +137,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char } } FREE(specs); - plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); + //plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); return plug_mgr; } @@ -156,6 +154,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->plugin_load_specs_array); } +#if 0 if(plug_mgr->stellar_mq_schema_array) { for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) @@ -164,6 +163,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->stellar_mq_schema_array); } + //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); if(plug_mgr->registered_packet_plugin_array) @@ -175,8 +175,9 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_packet_plugin_array); } - plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); - exdata_schema_free(plug_mgr->exdata_schema); +#endif + //plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); + //exdata_schema_free(plug_mgr->exdata_schema); FREE(plug_mgr); return; } @@ -184,7 +185,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) /******************************* * STELLAR EXDATA * *******************************/ - +#if 0 int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg) { if(st==NULL || name==NULL)return -1; @@ -245,238 +246,10 @@ void *session_exdata_get(struct session *sess, int idx) if(sess_exdata == NULL)return NULL; return exdata_get(sess_exdata, idx); } - -/******************************* - * STELLAR MQ * - *******************************/ -static void stellar_mq_topic_schema_copy(void *_dst, const void *_src) -{ - struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst, - *src = (struct stellar_mq_topic_schema *)_src; - memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema)); - dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; -} - -static void stellar_mq_topic_schema_dtor(void *_elt) -{ - struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt; - if (elt->topic_name) - FREE(elt->topic_name); - // FREE(elt); // free the item -} - -UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; - -int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; - - if(topic_name == NULL || mq_schema_array == NULL )return -1; - unsigned int len = utarray_len(mq_schema_array); - struct stellar_mq_topic_schema *t_schema; - for(unsigned int i = 0; i < len; i++) - { - t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i); - if(strcmp(t_schema->topic_name, topic_name) == 0) - { - return i; - } - } - return -1; -} - -int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; - if(mq_schema_array == NULL)return -1; - unsigned int len = utarray_len(mq_schema_array); - if(len < (unsigned int)topic_id)return -1; - struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); - if(t_schema == NULL)return -1; - t_schema->dispatch_cb=on_dispatch_cb; - t_schema->dispatch_cb_arg=on_dispatch_arg; - t_schema->free_cb=msg_free_cb; - t_schema->free_cb_arg=msg_free_arg; - return 0; -} - -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->stellar_mq_schema_array == NULL) - { - utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd); - } - unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); - if(stellar_mq_get_topic_id(st, topic_name) >= 0) - { - return -1; - } - struct stellar_mq_topic_schema t_schema; - memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema)); - t_schema.dispatch_cb=on_dispatch_cb; - t_schema.free_cb=msg_free_cb; - t_schema.topic_name=(char *)topic_name; - t_schema.topic_id=len;//topid_id equals arrary index - t_schema.dispatch_cb_arg=on_dispatch_arg; - t_schema.free_cb_arg=msg_free_arg; - t_schema.subscribers=NULL; - t_schema.subscriber_cnt=0; - utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema); - plug_mgr->stellar_mq_topic_num+=1; - return t_schema.topic_id; -} - -int stellar_mq_destroy_topic(struct stellar *st, int topic_id) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->stellar_mq_schema_array==NULL)return -1; - unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); - if (len <= (unsigned int)topic_id) - return -1; - struct stellar_mq_topic_schema *topic = - (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); - struct stellar_mq_subscriber *sub_elt, *sub_tmp; - - if(topic == NULL)return -1; - - if (topic->is_destroyed == 1)return 0; - - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - DL_DELETE(topic->subscribers, sub_elt); - FREE(sub_elt); - } - topic->is_destroyed = 1; - plug_mgr->stellar_mq_topic_num-=1; - return 1; // success -} +#endif -static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt) -{ - struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st); - struct stellar_mq_subscriber *sub_elt, *sub_tmp; - struct registered_plugin_schema *plugin_schema; - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, - (unsigned int)(mq_elt->header.topic_id)); - if (topic) - { - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - if (sub_elt->plugin_msg_cb) - { - plugin_schema = (struct registered_plugin_schema *)utarray_eltptr( - plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); - if (plugin_schema) - { - if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, plugin_schema->plugin_env); - else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env); - } - } - } - } -} - -static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue) -{ - struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; - int cur_priority = STELLAR_MQ_PRIORITY_HIGH; - while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) - { - if(priority_mq[cur_priority]==NULL) - { - cur_priority--; - continue; - } - DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp) - { - stellar_mq_dispatch_one_message(mq_elt); - DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt); - DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list - - cur_priority=STELLAR_MQ_PRIORITY_HIGH; - break; - } - } - return; -} - -static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array) -{ - struct stellar_message *mq_elt, *tmp; - struct stellar_mq_topic_schema *topic; - DL_FOREACH_SAFE(*head, mq_elt, tmp) - { - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, - (unsigned int)(mq_elt->header.topic_id)); - if (topic && topic->free_cb) - { - topic->free_cb(mq_elt->body, topic->free_cb_arg); - } - DL_DELETE(*head, mq_elt); - FREE(mq_elt); - } -} - -UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; - -//return 0 if success, otherwise return -1. -int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg) -{ - - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; - - unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); - if(topic==NULL)return -1; - - struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_msg_cb = on_msg_cb; - new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg; - DL_APPEND(topic->subscribers, new_subscriber); - - struct stellar_mq_subscriber_info sub_info; - memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); - sub_info.topic_id=topic_id; - sub_info.subscriber_idx=topic->subscriber_cnt; - topic->subscriber_cnt+=1; - plug_mgr->mq_topic_subscriber_num+=1; - return 0; -} - -int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority) -{ - if(st==NULL)return -1; - struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(st); - if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1; - - int tid = stellar_get_current_thread_index(); - if(stellar_per_stage_message_counter_overlimt(plug_mgr, tid)==true)return -1; - - unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - struct stellar_message *msg= CALLOC(struct stellar_message,1); - msg->st=plug_mgr->st; - msg->header.topic_id = topic_id; - msg->header.priority = priority; - msg->body = data; - DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg); - stellar_per_stage_message_counter_incby(plug_mgr, tid, 1); - return 0; -} - -int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data) -{ - return stellar_mq_publish_message_with_priority(st, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL); -} - +#if 0 /******************************* * PLUGIN MANAGER SESSION RUNTIME * *******************************/ @@ -491,7 +264,6 @@ void session_exdata_runtime_free(struct exdata_runtime *exdata_h) return exdata_handle_free(exdata_h); } - /********************************************* * PLUGIN MANAGER PLUGIN * *********************************************/ @@ -518,8 +290,8 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_plugin_schema *p=NULL; - int tid=stellar_get_current_thread_index(); - stellar_per_stage_message_counter_set(plug_mgr, tid, 0); + //int tid=stellar_get_current_thread_index(); + //stellar_per_stage_message_counter_set(plug_mgr, tid, 0); while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->on_packet[in_out]) @@ -527,7 +299,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str p->on_packet[in_out](pkt, p->plugin_env); } } - stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue); + //stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue); return; } @@ -540,11 +312,11 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str { if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); - int tid=stellar_get_current_thread_index(); - stellar_per_stage_message_counter_set(plug_mgr, tid, -1); - stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, - plug_mgr->stellar_mq_schema_array); - per_thread_packet_exdata_arrary_clean(plug_mgr); + //int tid=stellar_get_current_thread_index(); + //stellar_per_stage_message_counter_set(plug_mgr, tid, -1); + //stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, + // plug_mgr->stellar_mq_schema_array); + //per_thread_packet_exdata_arrary_clean(plug_mgr); } /********************************************* @@ -552,7 +324,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str *********************************************/ UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; -int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) +int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); if(plug_mgr->registered_polling_plugin_array == NULL) @@ -584,3 +356,5 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) } return polling_state; } + +#endif \ No newline at end of file diff --git a/infra/plugin_manager/plugin_manager.h b/infra/plugin_manager/plugin_manager.h index 1ecb68c..f2e9147 100644 --- a/infra/plugin_manager/plugin_manager.h +++ b/infra/plugin_manager/plugin_manager.h @@ -7,22 +7,19 @@ extern "C" { #endif -#define MAX_MSG_PER_STAGE 256 - struct plugin_manager_schema; struct plugin_manager_runtime; -struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage); +struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); -void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -//return polling work state, 0: idle, 1: working -int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); +//TODO +void *plugin_manager_get_plugin_env(const char *plugin_name); -struct exdata_runtime; -struct exdata_runtime *session_exdata_runtime_new(struct stellar *st); -void session_exdata_runtime_free(struct exdata_runtime *exdata_h); +//void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt); +//void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt); +//return polling work state, 0: idle, 1: working +//int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); #ifdef __cplusplus } diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index 74b1662..b918e0e 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -5,109 +5,20 @@ extern "C" { #endif -#include "plugin_manager.h" - #include "stellar/stellar.h" -#include "stellar/stellar_mq.h" #include "uthash/utarray.h" -#include "exdata/exdata.h" - -struct stellar_message; - - -struct plugin_manager_per_thread_data -{ - struct exdata_runtime *exdata_array; - struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list - struct stellar_message *dealth_letter_queue;// dlq list - unsigned long long pub_packet_msg_cnt; -}; - -struct plugin_manager_schema -{ - struct stellar *st; - UT_array *plugin_load_specs_array; - struct exdata_schema *exdata_schema; - UT_array *stellar_mq_schema_array; - UT_array *registered_packet_plugin_array; - UT_array *registered_polling_plugin_array; - int stellar_mq_topic_num; - int mq_topic_subscriber_num; - unsigned int max_message_dispatch; - struct plugin_manager_per_thread_data *per_thread_data; -}__attribute__((aligned(sizeof(void*)))); - - -struct stellar_message -{ - struct stellar *st; - struct - { - int topic_id; - enum stellar_mq_priority priority; - } header; - void *body; - struct stellar_message *next, *prev; -} __attribute__((aligned(sizeof(void *)))); - -typedef struct stellar_mq_subscriber -{ - int topic_subscriber_idx; - int plugin_idx; - on_msg_cb_func *plugin_msg_cb; - void *plugin_msg_cb_arg; - struct stellar_mq_subscriber *next, *prev; -}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); - - -struct stellar_mq_topic_schema -{ - char *topic_name; - int topic_id; - int subscriber_cnt; - int is_destroyed; - on_msg_dispatch_cb_func *dispatch_cb; - void *dispatch_cb_arg; - stellar_msg_free_cb_func *free_cb; - void *free_cb_arg; - struct stellar_mq_subscriber *subscribers; -}__attribute__((aligned(sizeof(void*)))); - - -enum packet_stage -{ - PACKET_STAGE_INPUT=0, - PACKET_STAGE_OUTPUT, - PACKET_STAGE_MAX -}; - -struct registered_plugin_schema -{ - char ip_protocol; - plugin_on_packet_func *on_packet[PACKET_STAGE_MAX]; - void *plugin_env; - UT_array *registed_mq_subscriber_info; -}__attribute__((aligned(sizeof(void*)))); - -struct registered_polling_plugin_schema -{ - plugin_on_polling_func *on_polling; - void *plugin_env; -}__attribute__((aligned(sizeof(void*)))); - -struct stellar_mq_subscriber_info -{ - int topic_id; - int subscriber_idx; -}__attribute__((aligned(sizeof(void*)))); /******************************* * PLUGIN MANAGER INIT & EXIT * *******************************/ -#include +struct plugin_manager_schema +{ + struct stellar *st; + UT_array *plugin_load_specs_array; +}__attribute__((aligned(sizeof(void*)))); struct plugin_specific { diff --git a/infra/stellar_core.c b/infra/stellar_core.c index ac307aa..f72574e 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -109,7 +109,7 @@ static inline void clean_session(struct session_manager *sess_mgr, uint64_t now_ for (uint64_t j = 0; j < nr_sess_cleaned; j++) { sess = cleaned_sess[j]; - session_exdata_runtime_free(session_get_user_data(sess)); + //session_exdata_runtime_free(session_get_user_data(sess)); session_manager_free_session(sess_mgr, sess); } } @@ -183,7 +183,7 @@ static void *worker_thread(void *arg) defraged_pkt = NULL; pkt = &packets[i]; - plugin_manager_on_packet_input(plug_mgr, pkt); + //plugin_manager_on_packet_input(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); @@ -194,7 +194,7 @@ static void *worker_thread(void *arg) else { pkt = defraged_pkt; - plugin_manager_on_packet_input(plug_mgr, defraged_pkt); + //plugin_manager_on_packet_input(plug_mgr, defraged_pkt); } } @@ -206,8 +206,8 @@ static void *worker_thread(void *arg) { goto fast_path; } - struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st); - session_set_user_data(sess, per_sess_exdata); + //struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st); + //session_set_user_data(sess, per_sess_exdata); } else { @@ -220,12 +220,12 @@ static void *worker_thread(void *arg) fast_path: if (pkt == defraged_pkt) { - plugin_manager_on_packet_output(plug_mgr, defraged_pkt); - plugin_manager_on_packet_output(plug_mgr, &packets[i]); + //plugin_manager_on_packet_output(plug_mgr, defraged_pkt); + //plugin_manager_on_packet_output(plug_mgr, &packets[i]); } else { - plugin_manager_on_packet_output(plug_mgr, pkt); + //plugin_manager_on_packet_output(plug_mgr, pkt); } if (sess) @@ -272,7 +272,7 @@ static void *worker_thread(void *arg) idle_tasks: clean_session(sess_mgr, now_ms); ip_reassembly_expire(ip_reass, now_ms); - plugin_manager_on_polling(plug_mgr); + //plugin_manager_on_polling(plug_mgr); stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms); if (nr_pkt_received == 0) @@ -448,7 +448,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg CORE_LOG_ERROR("unable to create stellar stat"); goto error_out; } - runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file, MAX_MSG_PER_STAGE); + runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file); if (runtime->plug_mgr == NULL) { CORE_LOG_ERROR("unable to create plugin manager");