🦄 refactor(stellar api): split exdata and mq

This commit is contained in:
yangwei
2024-09-10 10:18:05 +08:00
parent 6403e832de
commit e9825c3988
15 changed files with 423 additions and 476 deletions

View File

@@ -1,5 +1,10 @@
#pragma once #pragma once
#ifdef __cplusplus
extern "C"
{
#endif
typedef void exdata_free(int idx, void *ex_ptr, void *arg); typedef void exdata_free(int idx, void *ex_ptr, void *arg);
struct exdata_schema; struct exdata_schema;
@@ -7,16 +12,20 @@ struct exdata_schema;
struct exdata_schema *exdata_schema_new(); struct exdata_schema *exdata_schema_new();
void exdata_schema_free(struct exdata_schema *s); void exdata_schema_free(struct exdata_schema *s);
int exdata_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg); int exdata_schema_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg);
int exdata_schema_get_idx_by_name(struct exdata_schema *schema, const char *name); int exdata_schema_get_idx_by_name(struct exdata_schema *schema, const char *name);
struct exdata_runtime; struct exdata_runtime;
struct exdata_runtime *exdata_handle_new(struct exdata_schema *h); struct exdata_runtime *exdata_runtime_new(struct exdata_schema *h);
void exdata_handle_free(struct exdata_runtime *h); void exdata_runtime_free(struct exdata_runtime *h);
void exdata_handle_reset(struct exdata_runtime *h); void exdata_runtime_reset(struct exdata_runtime *h);//call free_func, and set ex_ptr to NULL
int exdata_set(struct exdata_runtime *h, int idx, void *ex_ptr); int exdata_set(struct exdata_runtime *h, int idx, void *ex_ptr);
void *exdata_get(struct exdata_runtime *h, int idx); void *exdata_get(struct exdata_runtime *h, int idx);
#ifdef __cplusplus
}
#endif

52
include/stellar/mq.h Normal file
View File

@@ -0,0 +1,52 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
struct mq_schema;
struct mq_schema *mq_schema_new();
void mq_schema_free(struct mq_schema *s);
struct mq_runtime;
struct mq_runtime *mq_runtime_new(struct mq_schema *s);
void mq_runtime_free(struct mq_runtime *s);
typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg);
typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg);
typedef void on_msg_dispatch_cb_func(int topic_id,
const void *msg,
on_msg_cb_func* on_msg_cb,
void *on_msg_cb_arg,
void *dispatch_arg);
//return topic_id
int mq_schema_create_topic(struct mq_schema *s,
const char *topic_name,
on_msg_dispatch_cb_func *on_dispatch_cb,
void *on_dispatch_arg,
mq_msg_free_cb_func *msg_free_cb,
void *msg_free_arg);
int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name);
int mq_schema_update_topic(struct mq_schema *s,
int topic_id,
on_msg_dispatch_cb_func *on_dispatch_cb,
void *on_dispatch_arg,
mq_msg_free_cb_func *msg_free_cb,
void *msg_free_arg);
int mq_schema_destroy_topic(struct mq_schema *s, int topic_id);
//return 0 if success, otherwise return -1.
int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg);
int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg);
void mq_runtime_dispatch(struct mq_runtime *rt);
#ifdef __cplusplus
}
#endif

View File

@@ -7,32 +7,24 @@ extern "C"
#include <stdint.h> #include <stdint.h>
struct stellar; #include "stellar/mq.h"
#include "stellar/log.h"
#include "stellar/packet.h"
/********************************************** struct stellar;
* PLUGIN API *
**********************************************/
//return plugin_env //return plugin_env
typedef void *plugin_on_load_func(struct stellar *st); typedef void *plugin_on_load_func(struct stellar *st);
typedef void plugin_on_unload_func(void *plugin_env); typedef void plugin_on_unload_func(void *plugin_env);
/**********************************************
* PLUGIN EVENT API *
**********************************************/
struct packet;
typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg); typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg);
//return 0 if success, otherwise return -1.
int stellar_raw_packet_subscribe(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg);
int stellar_on_raw_packet_register(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg); //return on_polling state, 0: idle, 1: working
//return polling work result, 0: no work, 1: work
typedef int plugin_on_polling_func(void *polling_arg); typedef int plugin_on_polling_func(void *polling_arg);
//return polling plugin_id //return 0 if success, otherwise return -1.
int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg); int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg);
/**********************************************
* STELLAR DEV API *
**********************************************/
void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str);
@@ -41,7 +33,6 @@ uint16_t stellar_get_current_thread_index();
// only send user build packet, can't send packet which come from network // only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt); void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
struct stellar;
struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file); struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file);
void stellar_run(struct stellar *st); void stellar_run(struct stellar *st);
void stellar_free(struct stellar *st); void stellar_free(struct stellar *st);
@@ -49,6 +40,9 @@ void stellar_loopbreak(struct stellar *st);
void stellar_reload_log_level(struct stellar *st); void stellar_reload_log_level(struct stellar *st);
struct logger *stellar_get_logger(struct stellar *st); struct logger *stellar_get_logger(struct stellar *st);
struct mq_schema *stellar_get_mq_schema(struct stellar *st);
struct mq_runtime *stellar_get_mq_runtime(struct stellar *st);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -1,34 +0,0 @@
#pragma once
#include "utils.h"
#include "stellar.h"
#ifdef __cplusplus
extern "C"
{
#endif
typedef void stellar_exdata_free(int idx, void *ex_ptr, void *arg);
inline static void stellar_exdata_free_default(int idx __unused, void *ex_ptr, void *arg __unused)
{
if(ex_ptr)FREE(ex_ptr);
}
struct packet;
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg);
//packet exdata api
int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr);
void *packet_exdata_get(struct packet *pkt, int idx);
struct session;
//session exdata api
int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
void *session_exdata_get(struct session *sess, int idx);
#ifdef __cplusplus
}
#endif

View File

@@ -1,45 +0,0 @@
#pragma once
#include "utils.h"
#include "stellar.h"
#ifdef __cplusplus
extern "C"
{
#endif
//topic api
typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg);
inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unused)
{
if(msg)FREE(msg);
}
typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg);
typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *on_msg_cb_arg, void *dispatch_arg);
//return topic_id
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_destroy_topic(struct stellar *st, int topic_id);
enum stellar_mq_priority
{
STELLAR_MQ_PRIORITY_LOW,
STELLAR_MQ_PRIORITY_NORMAL,
STELLAR_MQ_PRIORITY_HIGH,
STELLAR_MQ_PRIORITY_MAX,
};
//return 0 if success, otherwise return -1.
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg);
int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg);
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority);
#ifdef __cplusplus
}
#endif

View File

@@ -1,4 +1,4 @@
set(INFRA exdata tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager) set(INFRA exdata mq tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml)
#set(DECODERS http lpi) #set(DECODERS http lpi)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})

View File

@@ -57,7 +57,7 @@ static void stellar_exdata_met_dtor(void *_elt)
UT_icd stellar_exdata_meta_icd = {sizeof(struct exdata_meta), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor}; UT_icd stellar_exdata_meta_icd = {sizeof(struct exdata_meta), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor};
int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg) int exdata_schema_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg)
{ {
if(s==NULL || name==NULL)return -1; if(s==NULL || name==NULL)return -1;
if(s->exdata_meta_array == NULL) if(s->exdata_meta_array == NULL)
@@ -90,7 +90,7 @@ int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *fre
/******************************* /*******************************
* STELLAR EXDATA HANDLE API * * STELLAR EXDATA HANDLE API *
*******************************/ *******************************/
struct exdata_runtime *exdata_handle_new(struct exdata_schema *s) struct exdata_runtime *exdata_runtime_new(struct exdata_schema *s)
{ {
if(s==NULL || s->exdata_meta_array==NULL)return NULL; if(s==NULL || s->exdata_meta_array==NULL)return NULL;
struct exdata_runtime *h = CALLOC(struct exdata_runtime, 1); struct exdata_runtime *h = CALLOC(struct exdata_runtime, 1);
@@ -103,7 +103,7 @@ struct exdata_runtime *exdata_handle_new(struct exdata_schema *s)
return h; return h;
} }
void exdata_handle_reset(struct exdata_runtime *h) void exdata_runtime_reset(struct exdata_runtime *h)
{ {
if(h==NULL||h->schema==NULL||h->exdata_array==NULL)return; if(h==NULL||h->schema==NULL||h->exdata_array==NULL)return;
unsigned int len=utarray_len(h->schema->exdata_meta_array); unsigned int len=utarray_len(h->schema->exdata_meta_array);
@@ -125,10 +125,10 @@ void exdata_handle_reset(struct exdata_runtime *h)
return; return;
} }
void exdata_handle_free(struct exdata_runtime *h) void exdata_runtime_free(struct exdata_runtime *h)
{ {
if(h==NULL)return; if(h==NULL)return;
exdata_handle_reset(h); exdata_runtime_reset(h);
if(h->exdata_array)FREE(h->exdata_array); if(h->exdata_array)FREE(h->exdata_array);
FREE(h); FREE(h);
} }

View File

@@ -2,7 +2,7 @@
#include "uthash/utarray.h" #include "uthash/utarray.h"
#include "exdata.h" #include "stellar/exdata.h"
struct exdata_schema struct exdata_schema
{ {

3
infra/mq/CMakeLists.txt Normal file
View File

@@ -0,0 +1,3 @@
add_library(mq mq.c)
#add_subdirectory(test)

212
infra/mq/mq.c Normal file
View File

@@ -0,0 +1,212 @@
#include "mq_interna.h"
#include "stellar/utils.h"
#include "uthash/utlist.h"
/*******************************
* STELLAR MQ *
*******************************/
static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
*src = (struct stellar_mq_topic_schema *)_src;
memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
static void stellar_mq_topic_schema_dtor(void *_elt)
{
struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
if (elt->topic_name)
FREE(elt->topic_name);
// FREE(elt); // free the item
}
UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name)
{
if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1;
unsigned int len = utarray_len(s->topic_array);
struct stellar_mq_topic_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
}
}
return -1;
}
int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
if(s == NULL || s->topic_array == NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if(len < (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->dispatch_cb=on_dispatch_cb;
t_schema->dispatch_cb_arg=on_dispatch_arg;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
if(s==NULL)return -1;
if(s->topic_array == NULL)
{
utarray_new(s->topic_array, &stellar_mq_topic_schema_icd);
}
unsigned int len = utarray_len(s->topic_array);
if(mq_schema_get_topic_id(s, topic_name) >= 0)
{
return -1;
}
struct stellar_mq_topic_schema t_schema;
memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
t_schema.dispatch_cb=on_dispatch_cb;
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.dispatch_cb_arg=on_dispatch_arg;
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
utarray_push_back(s->topic_array, &t_schema);
s->stellar_mq_topic_num+=1;
return t_schema.topic_id;
}
int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
{
if(s==NULL)return -1;
if(s->topic_array==NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if (len <= (unsigned int)topic_id)
return -1;
struct stellar_mq_topic_schema *topic =
(struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
if(topic == NULL)return -1;
if (topic->is_destroyed == 1)return 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
DL_DELETE(topic->subscribers, sub_elt);
FREE(sub_elt);
}
topic->is_destroyed = 1;
s->stellar_mq_topic_num-=1;
return 1; // success
}
static void stellar_mq_dispatch_one_message(struct mq_schema *s, struct stellar_message *mq_elt)
{
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic)
{
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
if (sub_elt->plugin_msg_cb)
{
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, sub_elt->plugin_msg_cb_arg);
else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->plugin_msg_cb_arg);
}
}
}
}
static void mq_runtime_clean_dlq(struct mq_runtime *rt)
{
struct stellar_message *mq_elt, *tmp;
struct stellar_mq_topic_schema *topic;
DL_FOREACH_SAFE(rt->dealth_letter_queue, mq_elt, tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(rt->schema->topic_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
topic->free_cb(mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(rt->dealth_letter_queue, mq_elt);
FREE(mq_elt);
}
}
void stellar_mq_dispatch(struct mq_runtime *rt)
{
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
{
if(rt->priority_mq[cur_priority]==NULL)
{
cur_priority--;
continue;
}
DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp)
{
stellar_mq_dispatch_one_message(rt->schema, mq_elt);
DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
DL_APPEND(rt->dealth_letter_queue, mq_elt); // move to dlq list
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
break;
}
}
mq_runtime_clean_dlq(rt);
return;
}
//return 0 if success, otherwise return -1.
int stellar_mq_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
{
if(s == NULL || s->topic_array == NULL)return -1;
unsigned int len = utarray_len(s->topic_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->plugin_msg_cb = on_msg_cb;
new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg;
DL_APPEND(topic->subscribers, new_subscriber);
topic->subscriber_cnt+=1;
s->mq_topic_subscriber_num+=1;
return 0;
}
int stellar_mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum stellar_mq_priority priority)
{
if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
unsigned int len = utarray_len(rt->schema->topic_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->rt=rt;
msg->header.topic_id = topic_id;
msg->header.priority = priority;
msg->body = data;
DL_APPEND(rt->priority_mq[priority], msg);
return 0;
}
int stellar_mq_publish_message(struct mq_runtime *rt, int topic_id, void *data)
{
return stellar_mq_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM);
}

74
infra/mq/mq_interna.h Normal file
View File

@@ -0,0 +1,74 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/mq.h"
#include "uthash/utarray.h"
struct mq_schema
{
UT_array *topic_array;
int stellar_mq_topic_num;
int mq_topic_subscriber_num;
};
enum stellar_mq_priority
{
STELLAR_MQ_PRIORITY_LOW = 0,
STELLAR_MQ_PRIORITY_MEDIUM,
STELLAR_MQ_PRIORITY_HIGH,
STELLAR_MQ_PRIORITY_MAX
};
struct stellar_message
{
struct mq_runtime *rt;
struct
{
int topic_id;
enum stellar_mq_priority priority;
} header;
void *body;
struct stellar_message *next, *prev;
} __attribute__((aligned(sizeof(void *))));
typedef struct stellar_mq_subscriber
{
int topic_subscriber_idx;
int plugin_idx;
on_msg_cb_func *plugin_msg_cb;
void *plugin_msg_cb_arg;
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
struct stellar_mq_topic_schema
{
char *topic_name;
int topic_id;
int subscriber_cnt;
int is_destroyed;
on_msg_dispatch_cb_func *dispatch_cb;
void *dispatch_cb_arg;
mq_msg_free_cb_func *free_cb;
void *free_cb_arg;
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
struct mq_runtime
{
struct mq_schema *schema;
struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list
};
#ifdef __cplusplus
}
#endif

View File

@@ -1,16 +1,12 @@
#include "plugin_manager_interna.h" #include "plugin_manager_interna.h"
#include "stellar/stellar_exdata.h"
#include "stellar/utils.h" #include "stellar/utils.h"
#include "toml/toml.h" #include "toml/toml.h"
#include "uthash/utlist.h" #include <dlfcn.h>
#include <stdbool.h> #include <stdbool.h>
#include "stellar_core.h" #include "stellar_core.h"
#include "packet_private.h"
#include "session_private.h"
#if 0
void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment) void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{ {
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment; plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment;
@@ -26,6 +22,7 @@ bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *pl
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true; if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true;
return false; return false;
} }
#endif
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
@@ -87,6 +84,7 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL; return NULL;
} }
#if 0
static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
{ {
if(st == NULL)return NULL; if(st == NULL)return NULL;
@@ -94,7 +92,6 @@ static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new
struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num); struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num);
return per_thread_data; return per_thread_data;
} }
static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st) static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st)
{ {
if(per_thread_data == NULL || st == NULL)return; if(per_thread_data == NULL || st == NULL)return;
@@ -108,8 +105,9 @@ static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread
FREE(per_thread_data); FREE(per_thread_data);
return; return;
} }
#endif
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage) struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{ {
int spec_num; int spec_num;
struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num); struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
@@ -118,7 +116,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return NULL; return NULL;
} }
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
plug_mgr->max_message_dispatch=max_msg_per_stage; //plug_mgr->max_message_dispatch=max_msg_per_stage;
if(spec_num > 0) if(spec_num > 0)
{ {
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
@@ -128,7 +126,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
plug_mgr->st = st; plug_mgr->st = st;
stellar_set_plugin_manger(st, plug_mgr); stellar_set_plugin_manger(st, plug_mgr);
plug_mgr->exdata_schema=exdata_schema_new(); //plug_mgr->exdata_schema=exdata_schema_new();
for(int i = 0; i < spec_num; i++) for(int i = 0; i < spec_num; i++)
{ {
@@ -139,7 +137,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
} }
} }
FREE(specs); FREE(specs);
plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); //plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
return plug_mgr; return plug_mgr;
} }
@@ -156,6 +154,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
} }
utarray_free(plug_mgr->plugin_load_specs_array); utarray_free(plug_mgr->plugin_load_specs_array);
} }
#if 0
if(plug_mgr->stellar_mq_schema_array) if(plug_mgr->stellar_mq_schema_array)
{ {
for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
@@ -164,6 +163,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
} }
utarray_free(plug_mgr->stellar_mq_schema_array); utarray_free(plug_mgr->stellar_mq_schema_array);
} }
//if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array) if(plug_mgr->registered_packet_plugin_array)
@@ -175,8 +175,9 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
} }
utarray_free(plug_mgr->registered_packet_plugin_array); utarray_free(plug_mgr->registered_packet_plugin_array);
} }
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); #endif
exdata_schema_free(plug_mgr->exdata_schema); //plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
//exdata_schema_free(plug_mgr->exdata_schema);
FREE(plug_mgr); FREE(plug_mgr);
return; return;
} }
@@ -184,7 +185,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
/******************************* /*******************************
* STELLAR EXDATA * * STELLAR EXDATA *
*******************************/ *******************************/
#if 0
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg) int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
{ {
if(st==NULL || name==NULL)return -1; if(st==NULL || name==NULL)return -1;
@@ -245,238 +246,10 @@ void *session_exdata_get(struct session *sess, int idx)
if(sess_exdata == NULL)return NULL; if(sess_exdata == NULL)return NULL;
return exdata_get(sess_exdata, idx); return exdata_get(sess_exdata, idx);
} }
#endif
/*******************************
* STELLAR MQ *
*******************************/
static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
*src = (struct stellar_mq_topic_schema *)_src;
memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
static void stellar_mq_topic_schema_dtor(void *_elt)
{
struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
if (elt->topic_name)
FREE(elt->topic_name);
// FREE(elt); // free the item
}
UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
if(topic_name == NULL || mq_schema_array == NULL )return -1;
unsigned int len = utarray_len(mq_schema_array);
struct stellar_mq_topic_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
}
}
return -1;
}
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
if(mq_schema_array == NULL)return -1;
unsigned int len = utarray_len(mq_schema_array);
if(len < (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->dispatch_cb=on_dispatch_cb;
t_schema->dispatch_cb_arg=on_dispatch_arg;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_mq_schema_array == NULL)
{
utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd);
}
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if(stellar_mq_get_topic_id(st, topic_name) >= 0)
{
return -1;
}
struct stellar_mq_topic_schema t_schema;
memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
t_schema.dispatch_cb=on_dispatch_cb;
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.dispatch_cb_arg=on_dispatch_arg;
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema);
plug_mgr->stellar_mq_topic_num+=1;
return t_schema.topic_id;
}
int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
struct stellar_mq_topic_schema *topic =
(struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
if(topic == NULL)return -1;
if (topic->is_destroyed == 1)return 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
DL_DELETE(topic->subscribers, sub_elt);
FREE(sub_elt);
}
topic->is_destroyed = 1;
plug_mgr->stellar_mq_topic_num-=1;
return 1; // success
}
static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt) #if 0
{
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct registered_plugin_schema *plugin_schema;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic)
{
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
if (sub_elt->plugin_msg_cb)
{
plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (plugin_schema)
{
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, plugin_schema->plugin_env);
else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env);
}
}
}
}
}
static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue)
{
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
{
if(priority_mq[cur_priority]==NULL)
{
cur_priority--;
continue;
}
DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
{
stellar_mq_dispatch_one_message(mq_elt);
DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
break;
}
}
return;
}
static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array)
{
struct stellar_message *mq_elt, *tmp;
struct stellar_mq_topic_schema *topic;
DL_FOREACH_SAFE(*head, mq_elt, tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
topic->free_cb(mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(*head, mq_elt);
FREE(mq_elt);
}
}
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
//return 0 if success, otherwise return -1.
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->plugin_msg_cb = on_msg_cb;
new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg;
DL_APPEND(topic->subscribers, new_subscriber);
struct stellar_mq_subscriber_info sub_info;
memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
sub_info.topic_id=topic_id;
sub_info.subscriber_idx=topic->subscriber_cnt;
topic->subscriber_cnt+=1;
plug_mgr->mq_topic_subscriber_num+=1;
return 0;
}
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority)
{
if(st==NULL)return -1;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(st);
if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1;
int tid = stellar_get_current_thread_index();
if(stellar_per_stage_message_counter_overlimt(plug_mgr, tid)==true)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->st=plug_mgr->st;
msg->header.topic_id = topic_id;
msg->header.priority = priority;
msg->body = data;
DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg);
stellar_per_stage_message_counter_incby(plug_mgr, tid, 1);
return 0;
}
int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data)
{
return stellar_mq_publish_message_with_priority(st, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
}
/******************************* /*******************************
* PLUGIN MANAGER SESSION RUNTIME * * PLUGIN MANAGER SESSION RUNTIME *
*******************************/ *******************************/
@@ -491,7 +264,6 @@ void session_exdata_runtime_free(struct exdata_runtime *exdata_h)
return exdata_handle_free(exdata_h); return exdata_handle_free(exdata_h);
} }
/********************************************* /*********************************************
* PLUGIN MANAGER PLUGIN * * PLUGIN MANAGER PLUGIN *
*********************************************/ *********************************************/
@@ -518,8 +290,8 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_plugin_schema *p=NULL; struct registered_plugin_schema *p=NULL;
int tid=stellar_get_current_thread_index(); //int tid=stellar_get_current_thread_index();
stellar_per_stage_message_counter_set(plug_mgr, tid, 0); //stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{ {
if(p->on_packet[in_out]) if(p->on_packet[in_out])
@@ -527,7 +299,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
p->on_packet[in_out](pkt, p->plugin_env); p->on_packet[in_out](pkt, p->plugin_env);
} }
} }
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue); //stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
return; return;
} }
@@ -540,11 +312,11 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
{ {
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
int tid=stellar_get_current_thread_index(); //int tid=stellar_get_current_thread_index();
stellar_per_stage_message_counter_set(plug_mgr, tid, -1); //stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, //stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
plug_mgr->stellar_mq_schema_array); // plug_mgr->stellar_mq_schema_array);
per_thread_packet_exdata_arrary_clean(plug_mgr); //per_thread_packet_exdata_arrary_clean(plug_mgr);
} }
/********************************************* /*********************************************
@@ -552,7 +324,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
*********************************************/ *********************************************/
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
{ {
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_polling_plugin_array == NULL) if(plug_mgr->registered_polling_plugin_array == NULL)
@@ -584,3 +356,5 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
} }
return polling_state; return polling_state;
} }
#endif

View File

@@ -7,22 +7,19 @@ extern "C"
{ {
#endif #endif
#define MAX_MSG_PER_STAGE 256
struct plugin_manager_schema; struct plugin_manager_schema;
struct plugin_manager_runtime; struct plugin_manager_runtime;
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage); struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt); //TODO
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt); void *plugin_manager_get_plugin_env(const char *plugin_name);
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
struct exdata_runtime; //void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
struct exdata_runtime *session_exdata_runtime_new(struct stellar *st); //void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
void session_exdata_runtime_free(struct exdata_runtime *exdata_h); //return polling work state, 0: idle, 1: working
//int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@@ -5,109 +5,20 @@ extern "C"
{ {
#endif #endif
#include "plugin_manager.h"
#include "stellar/stellar.h" #include "stellar/stellar.h"
#include "stellar/stellar_mq.h"
#include "uthash/utarray.h" #include "uthash/utarray.h"
#include "exdata/exdata.h"
struct stellar_message;
struct plugin_manager_per_thread_data
{
struct exdata_runtime *exdata_array;
struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list
unsigned long long pub_packet_msg_cnt;
};
struct plugin_manager_schema
{
struct stellar *st;
UT_array *plugin_load_specs_array;
struct exdata_schema *exdata_schema;
UT_array *stellar_mq_schema_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
int stellar_mq_topic_num;
int mq_topic_subscriber_num;
unsigned int max_message_dispatch;
struct plugin_manager_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
struct stellar_message
{
struct stellar *st;
struct
{
int topic_id;
enum stellar_mq_priority priority;
} header;
void *body;
struct stellar_message *next, *prev;
} __attribute__((aligned(sizeof(void *))));
typedef struct stellar_mq_subscriber
{
int topic_subscriber_idx;
int plugin_idx;
on_msg_cb_func *plugin_msg_cb;
void *plugin_msg_cb_arg;
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
struct stellar_mq_topic_schema
{
char *topic_name;
int topic_id;
int subscriber_cnt;
int is_destroyed;
on_msg_dispatch_cb_func *dispatch_cb;
void *dispatch_cb_arg;
stellar_msg_free_cb_func *free_cb;
void *free_cb_arg;
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
enum packet_stage
{
PACKET_STAGE_INPUT=0,
PACKET_STAGE_OUTPUT,
PACKET_STAGE_MAX
};
struct registered_plugin_schema
{
char ip_protocol;
plugin_on_packet_func *on_packet[PACKET_STAGE_MAX];
void *plugin_env;
UT_array *registed_mq_subscriber_info;
}__attribute__((aligned(sizeof(void*))));
struct registered_polling_plugin_schema
{
plugin_on_polling_func *on_polling;
void *plugin_env;
}__attribute__((aligned(sizeof(void*))));
struct stellar_mq_subscriber_info
{
int topic_id;
int subscriber_idx;
}__attribute__((aligned(sizeof(void*))));
/******************************* /*******************************
* PLUGIN MANAGER INIT & EXIT * * PLUGIN MANAGER INIT & EXIT *
*******************************/ *******************************/
#include <dlfcn.h> struct plugin_manager_schema
{
struct stellar *st;
UT_array *plugin_load_specs_array;
}__attribute__((aligned(sizeof(void*))));
struct plugin_specific struct plugin_specific
{ {

View File

@@ -109,7 +109,7 @@ static inline void clean_session(struct session_manager *sess_mgr, uint64_t now_
for (uint64_t j = 0; j < nr_sess_cleaned; j++) for (uint64_t j = 0; j < nr_sess_cleaned; j++)
{ {
sess = cleaned_sess[j]; sess = cleaned_sess[j];
session_exdata_runtime_free(session_get_user_data(sess)); //session_exdata_runtime_free(session_get_user_data(sess));
session_manager_free_session(sess_mgr, sess); session_manager_free_session(sess_mgr, sess);
} }
} }
@@ -183,7 +183,7 @@ static void *worker_thread(void *arg)
defraged_pkt = NULL; defraged_pkt = NULL;
pkt = &packets[i]; pkt = &packets[i];
plugin_manager_on_packet_input(plug_mgr, pkt); //plugin_manager_on_packet_input(plug_mgr, pkt);
if (packet_is_fragment(pkt)) if (packet_is_fragment(pkt))
{ {
defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms);
@@ -194,7 +194,7 @@ static void *worker_thread(void *arg)
else else
{ {
pkt = defraged_pkt; pkt = defraged_pkt;
plugin_manager_on_packet_input(plug_mgr, defraged_pkt); //plugin_manager_on_packet_input(plug_mgr, defraged_pkt);
} }
} }
@@ -206,8 +206,8 @@ static void *worker_thread(void *arg)
{ {
goto fast_path; goto fast_path;
} }
struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st); //struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st);
session_set_user_data(sess, per_sess_exdata); //session_set_user_data(sess, per_sess_exdata);
} }
else else
{ {
@@ -220,12 +220,12 @@ static void *worker_thread(void *arg)
fast_path: fast_path:
if (pkt == defraged_pkt) if (pkt == defraged_pkt)
{ {
plugin_manager_on_packet_output(plug_mgr, defraged_pkt); //plugin_manager_on_packet_output(plug_mgr, defraged_pkt);
plugin_manager_on_packet_output(plug_mgr, &packets[i]); //plugin_manager_on_packet_output(plug_mgr, &packets[i]);
} }
else else
{ {
plugin_manager_on_packet_output(plug_mgr, pkt); //plugin_manager_on_packet_output(plug_mgr, pkt);
} }
if (sess) if (sess)
@@ -272,7 +272,7 @@ static void *worker_thread(void *arg)
idle_tasks: idle_tasks:
clean_session(sess_mgr, now_ms); clean_session(sess_mgr, now_ms);
ip_reassembly_expire(ip_reass, now_ms); ip_reassembly_expire(ip_reass, now_ms);
plugin_manager_on_polling(plug_mgr); //plugin_manager_on_polling(plug_mgr);
stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms); stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms);
if (nr_pkt_received == 0) if (nr_pkt_received == 0)
@@ -448,7 +448,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_ERROR("unable to create stellar stat"); CORE_LOG_ERROR("unable to create stellar stat");
goto error_out; goto error_out;
} }
runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file, MAX_MSG_PER_STAGE); runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file);
if (runtime->plug_mgr == NULL) if (runtime->plug_mgr == NULL)
{ {
CORE_LOG_ERROR("unable to create plugin manager"); CORE_LOG_ERROR("unable to create plugin manager");