diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 3c9947a..8e10935 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -2,4 +2,5 @@ add_subdirectory(timeout) add_subdirectory(dablooms) add_subdirectory(toml) add_subdirectory(rbtree) -add_subdirectory(interval_tree) \ No newline at end of file +add_subdirectory(interval_tree) +add_subdirectory(bitmap) \ No newline at end of file diff --git a/deps/bitmap/CMakeLists.txt b/deps/bitmap/CMakeLists.txt new file mode 100644 index 0000000..4eefd63 --- /dev/null +++ b/deps/bitmap/CMakeLists.txt @@ -0,0 +1,3 @@ +set(CMAKE_C_FLAGS "-std=c99") +add_definitions(-fPIC) +add_library(bitmap STATIC bitmap.c) \ No newline at end of file diff --git a/deps/bitmap/bitmap.c b/deps/bitmap/bitmap.c new file mode 100644 index 0000000..59fb4b7 --- /dev/null +++ b/deps/bitmap/bitmap.c @@ -0,0 +1,59 @@ +#include +#include +#include + +struct bitmap { + unsigned char *data; + int width; + int height; +}; + +struct bitmap * bitmap_new(int width, int height, int value) { + struct bitmap *bmp = (struct bitmap *)malloc(sizeof(struct bitmap)); + bmp->width = width; + bmp->height = height; + int size = (width * height + 7) / 8; // Calculate total bytes needed + bmp->data = (unsigned char *)calloc(size,1 ); + memset(bmp->data, value ? 0xFF : 0x00, size); + return bmp; +} + +int bitmap_set(struct bitmap *bmp, int x, int y, int value) { + if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) { + return -1; // Return error code if coordinates are out of bounds + } + int idx = y * bmp->width + x; + if (value) + bmp->data[idx / 8] |= (1 << (idx % 8)); + else + bmp->data[idx / 8] &= ~(1 << (idx % 8)); + return 0; +} + +int bitmap_get(struct bitmap *bmp, int x, int y) { + if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) { + return -1; // Return error code if coordinates are out of bounds + } + int idx = y * bmp->width + x; + return (bmp->data[idx / 8] & (1 << (idx % 8))) != 0; +} + +void bitmap_free(struct bitmap *bmp) { + if(bmp) + { + if(bmp->data)free(bmp->data); + free(bmp); + } +} + + + + +int test_bitmap() { + struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap + if (bitmap_set(bmp, 2, 2, 1) == 0) { // Set bit at position (2,2) + printf("Bit at (2,2): %d\n", bitmap_get(bmp, 2, 2)); // Get bit at position (2,2) + } + bitmap_free(bmp); + return 0; +} diff --git a/deps/bitmap/bitmap.h b/deps/bitmap/bitmap.h new file mode 100644 index 0000000..210ea35 --- /dev/null +++ b/deps/bitmap/bitmap.h @@ -0,0 +1,5 @@ +struct bitmap; +struct bitmap * bitmap_new(int width, int height, int value); +int bitmap_set(struct bitmap *bmp, int x, int y, int value); +int bitmap_get(struct bitmap *bmp, int x, int y); +void bitmap_free(struct bitmap *bmp); \ No newline at end of file diff --git a/include/stellar/session.h b/include/stellar/session.h index 6d3ae34..5a94a73 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -123,10 +123,18 @@ enum session_timestamp }; struct session; +#define SESSION_SEEN_C2S_FLOW (1 << 0) +#define SESSION_SEEN_S2C_FLOW (1 << 1) +int session_is_symmetric(struct session *sess, unsigned char *flag); int session_has_duplicate_traffic(const struct session *sess); enum session_type session_get_type(const struct session *sess); enum session_state session_get_state(const struct session *sess); + +enum session_state session_get_current_state(const struct session *sess); +const struct packet *session_get0_current_packet(struct session *sess); +const char *session_get0_current_payload(struct session *sess, size_t *payload_len); + enum closing_reason session_get_closing_reason(const struct session *sess); enum session_direction session_get_direction(const struct session *sess); enum flow_direction session_get_current_flow_direction(const struct session *sess); @@ -141,6 +149,8 @@ uint64_t session_get_id(const struct session *sess); uint64_t session_get_timestamp(const struct session *sess, enum session_timestamp type); uint64_t session_get_stat(const struct session *sess, enum flow_direction dir, enum session_stat stat); +const char *session_get0_readable_addr(struct session *sess); + #ifdef __cplusplus } #endif diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h new file mode 100644 index 0000000..a10139e --- /dev/null +++ b/include/stellar/session_exdata.h @@ -0,0 +1,17 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar.h" + +typedef void session_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg); +int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *arg); +int session_exdata_set(struct session *sess, int idx, void *ex_ptr); +void *session_exdata_get(struct session *sess, int idx); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h new file mode 100644 index 0000000..2ad9f9f --- /dev/null +++ b/include/stellar/session_mq.h @@ -0,0 +1,33 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar.h" + +//session mq +typedef void msg_free_cb_func(void *msg, void *msg_free_arg); +typedef void on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env); + +//return topic_id +int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg); + +int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name); + +int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg); + +int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id); + +//return 0 if success, otherwise return -1. +int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id); + +int session_mq_publish_message(struct session *sess, int topic_id, void *msg); + +int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); +int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index e02076d..8ef9c2b 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -9,6 +9,48 @@ extern "C" #include #include "stellar/session.h" +#include "session.h" + +struct session; +struct stellar; + +//return plugin_env +typedef void *plugin_on_load_func(struct stellar *st); +typedef void plugin_on_unload_func(void *plugin_env); + +//return per_session_ctx +typedef void *session_ctx_new_func(struct session *sess, void *plugin_env); +typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env); + +#define TOPIC_TCP "TCP" +#define TOPIC_TCP_STREAM "TCP_STREAM" +#define TOPIC_UDP "UDP" +#define TOPIC_EGRESS "EGRESS" +#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" + +//return session plugin_id +int stellar_session_plugin_register(struct stellar *st, + session_ctx_new_func session_ctx_new, + session_ctx_free_func session_ctx_free, + void *plugin_env); + +void stellar_session_plugin_dettach_current_session(struct session *sess); + + + +struct packet; +typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env); + +//return packet plugin_id +int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env); + + +//return polling work result, 0: no work, 1: work +typedef int plugin_on_polling_func(void *plugin_env); + +//return polling plugin_id +int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env); + uint16_t stellar_get_current_thread_index(); // return inject packet length, return 0 if failed diff --git a/include/stellar/utils.h b/include/stellar/utils.h new file mode 100644 index 0000000..fb1dc5b --- /dev/null +++ b/include/stellar/utils.h @@ -0,0 +1,36 @@ +#pragma once + +#include //calloc +#include //NULL + +#define ALLOC(type, number) ((type *)calloc(sizeof(type), number)) +#define CALLOC(type, number) ((type *)calloc(sizeof(type), number)) + +#define REALLOC(type, ptr, number) ((type *)realloc(ptr, (number) * sizeof(type))) + +#define FREE(p) {free(p); p = NULL;} + +#define TRUE 1 +#define FALSE 0 + +#ifndef MAX +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) +#endif + +#ifndef MIN +#define MIN(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +#ifndef offsetof +#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) +#endif + +#ifndef container_of +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) +#endif + +#ifndef __unused +#define __unused __attribute__((__unused__)) +#endif diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt index 3dbcff5..5ad4c6d 100644 --- a/src/plugin/CMakeLists.txt +++ b/src/plugin/CMakeLists.txt @@ -1,3 +1,4 @@ add_library(plugin_manager plugin_manager.cpp) target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) -target_link_libraries(plugin_manager session_manager core) \ No newline at end of file +target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/) +target_link_libraries(plugin_manager bitmap toml session_manager core ${CMAKE_DL_LIBS}) \ No newline at end of file diff --git a/src/plugin/plugin_manager.cpp b/src/plugin/plugin_manager.cpp index df2e920..86373eb 100644 --- a/src/plugin/plugin_manager.cpp +++ b/src/plugin/plugin_manager.cpp @@ -3,80 +3,940 @@ #include "session_priv.h" #include "stellar_priv.h" -struct plugin_manager + +#include "plugin_manager.h" + +#include "stellar/utils.h" +#include "stellar/session.h" +#include "stellar/session_exdata.h" +#include "stellar/session_mq.h" +#include "tcp_reassembly.h" + +extern "C" { - // TODO + #include "uthash/utlist.h" + #include "uthash/utarray.h" + #include "bitmap/bitmap.h" +} + +struct plugin_manager_schema +{ + struct stellar *st; + UT_array *session_exdata_schema_array; + UT_array *plugin_load_specs_array; + UT_array *session_mq_schema_array; + UT_array *registered_session_plugin_array; + UT_array *registered_packet_plugin_array; + UT_array *registered_polling_plugin_array; + int topic_num; + int subscriber_num; + int tcp_topic_id; + int udp_topic_id; + int tcp_stream_topic_id; + int egress_topic_id; + int control_packet_topic_id; }; -void *plugin_manager_new_ctx(struct session *sess) + +struct session_exdata_schema { - // TODO - return sess; + char *name; + session_exdata_free *free_func; + void *free_arg; + int idx; +}; + +struct session_message +{ + int topic_id; + void *msg_data; + struct session_message *next, *prev; +}; + +typedef struct session_mq_subscriber +{ + int topic_subscriber_idx; + int session_plugin_id; + on_msg_cb_func *msg_cb; + struct session_mq_subscriber *next, *prev; +}session_mq_subscribers; + +struct session_mq_topic_schema +{ + char *topic_name; + msg_free_cb_func *free_cb; + void *free_cb_arg; + int topic_id; + int subscriber_cnt; + struct session_mq_subscriber *subscribers; +}; + +enum plugin_ctx_state +{ INIT, ACTIVE, EXIT }; + +struct session_plugin_ctx_runtime +{ + enum plugin_ctx_state state; + int session_plugin_id; + void *plugin_ctx; +}; + +struct plugin_exdata +{ + void *exdata; +}; + +struct plugin_manager_runtime +{ + struct plugin_manager_schema *plug_mgr; + struct session *sess; + struct session_message *pending_mq;// message list + struct session_message *delivered_mq;// message list + struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber + struct plugin_exdata *plugin_exdata_array; + struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free + int current_session_plugin_id; +}; + +struct registered_packet_plugin_schema +{ + char ip_protocol; + plugin_on_packet_func *on_packet; + void *plugin_env; +}; + +struct registered_polling_plugin_schema +{ + plugin_on_polling_func *on_polling; + void *plugin_env; +}; + +struct session_mq_subscriber_info +{ + int topic_id; + int subscriber_idx; +}; + +struct registered_session_plugin_schema +{ + session_ctx_new_func *on_ctx_new; + session_ctx_free_func *on_ctx_free; + void *plugin_env; + UT_array *registed_session_mq_subscriber_info; +}; + +#define PACKET_PULGIN_ID_BASE 0x10000 +#define POLLING_PULGIN_ID_BASE 0x20000 + +/******************************* + * PLUGIN MANAGER INIT & EXIT * + *******************************/ + +#include +#include "toml/toml.h" + +struct plugin_specific +{ + char plugin_name[256]; + plugin_on_load_func *load_cb; + plugin_on_unload_func *unload_cb; + void *plugin_ctx; +}; + + +thread_local struct session *per_thread_scratch_sess; + +inline static void plugin_manager_scratch_session_set(struct session *sess) +{ + per_thread_scratch_sess = sess; } -void plugin_manager_free_ctx(void *ctx) +inline static struct session *plugin_manager_scratch_session_get() { - // TODO - // struct session *sess = (struct session *)ctx; - // char buff[4096] = {0}; - // session_to_json(sess, buff, sizeof(buff)); - // PLUGIN_MANAGER_LOG_DEBUG("=> SESSION : %s", buff); + return per_thread_scratch_sess; } -struct plugin_manager *plugin_manager_new(void) +inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st) { - // TODO - static struct plugin_manager mgr; - return &mgr; + return st->st_rt->plug_mgr; } -void plugin_manager_free(struct plugin_manager *mgr) +inline int stellar_plugin_manager_schema_set(struct stellar *st, struct plugin_manager_schema *pm) { - // TODO + if(st->st_rt->plug_mgr)return -1; + st->st_rt->plug_mgr=pm; + return 0; } -void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt) -{ - // TODO - // current implementation only for testing +UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; - struct tcp_segment *seg; - enum session_state state = session_get_state(sess); +static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) +{ + *spec_num = 0; + FILE* fp = fopen(toml_conf_path, "r"); + if(fp==NULL)return NULL; + char errbuf[256]; + toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf)); + fclose(fp); + if (!conf) { + fprintf(stderr, "Error parsing toml: %s\n", errbuf); + return NULL; + } + toml_array_t* plugin_array = toml_array_in(conf, "plugin"); + if(plugin_array==NULL)return NULL; + *spec_num = toml_array_nelem(plugin_array); + struct plugin_specific* plugins = ALLOC(struct plugin_specific, *spec_num); + + for (int i = 0; i < *spec_num; i++) { + toml_table_t* plugin = toml_table_at(plugin_array, i); + + const char *path_raw = toml_raw_in(plugin, "path"); + const char *init_func_name_raw = toml_raw_in(plugin, "init"); + const char *exit_func_name_raw = toml_raw_in(plugin, "exit"); + char *path = NULL; + char *init_func_name = NULL; + char *exit_func_name = NULL; + if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) || + toml_rtos(exit_func_name_raw, &exit_func_name)) + { + goto PLUGIN_SPEC_LOAD_ERROR; + } + void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL); + if (!handle) { + fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror()); + goto PLUGIN_SPEC_LOAD_ERROR; + } + + plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name); + if (!plugins[i].load_cb) { + fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror()); + } + + plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name); + if (!plugins[i].unload_cb) { + fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror()); + } + FREE(path); + FREE(init_func_name); + FREE(exit_func_name); + } + toml_free(conf); + return plugins; +PLUGIN_SPEC_LOAD_ERROR: + toml_free(conf); + FREE(plugins); + return NULL; +} + +#include "session_priv.h" +static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg) +{ + struct session *cur_sess = plugin_manager_scratch_session_get(); + if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg); +} + +struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) +{ + int spec_num; + struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num); + if(spec_num < 0) + { + return NULL; + } + struct plugin_manager_schema *pm = ALLOC(struct plugin_manager_schema, 1); + if(spec_num > 0) + { + utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd); + utarray_reserve(pm->plugin_load_specs_array, spec_num); + } + + pm->st = st; + stellar_plugin_manager_schema_set(st, pm); + + + pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL); + pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL); + pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL); + pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); + pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); + + for(int i = 0; i < spec_num; i++) + { + if (specs[i].load_cb != NULL) + { + specs[i].plugin_ctx=specs[i].load_cb(st); + utarray_push_back(pm->plugin_load_specs_array, &specs[i]); + } + } + FREE(specs); + return pm; +} + +void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) +{ + struct plugin_specific *p=NULL; + if (plug_mgr->plugin_load_specs_array) + { + while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p))) + { + if (p->unload_cb) + p->unload_cb(p->plugin_ctx); + } + utarray_free(plug_mgr->plugin_load_specs_array); + } + if(plug_mgr->session_mq_schema_array) + { + for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++) + { + stellar_session_mq_destroy_topic(plug_mgr->st, i); + } + utarray_free(plug_mgr->session_mq_schema_array); + } + if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); + if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array); + if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); + if(plug_mgr->registered_session_plugin_array) + { + struct registered_session_plugin_schema *s = NULL; + while ((s = (struct registered_session_plugin_schema *)utarray_next(plug_mgr->registered_session_plugin_array, s))) + { + if(s->registed_session_mq_subscriber_info)utarray_free(s->registed_session_mq_subscriber_info); + } + utarray_free(plug_mgr->registered_session_plugin_array); + } + FREE(plug_mgr); + return; +} + + +/******************************* + * SESSION EXDATA * + *******************************/ + +static void session_exdata_met_copy(void *_dst, const void *_src) +{ + struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src; + dst->free_func = src->free_func; + dst->free_arg = src->free_arg; + dst->idx = src->idx; + dst->name = src->name ? strdup(src->name) : NULL; +} + +static void session_exdata_met_dtor(void *_elt) +{ + struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt; + if (elt->name) + FREE(elt->name); +} + +UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor}; + + +int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->session_exdata_schema_array == NULL) + { + utarray_new(plug_mgr->session_exdata_schema_array, &session_exdata_meta_icd); + } + if(plug_mgr->session_exdata_schema_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array); + struct session_exdata_schema *t_schema; + for(unsigned int i = 0; i < len; i++) + { + t_schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i); + if(strcmp(t_schema->name, name) == 0) + { + return t_schema->idx; + } + } + struct session_exdata_schema new_schema; + memset(&new_schema, 0, sizeof(struct session_exdata_schema)); + new_schema.free_func=free_func; + new_schema.name=(char *)name; + new_schema.idx=len; + new_schema.free_arg=free_arg; + utarray_push_back(plug_mgr->session_exdata_schema_array, &new_schema); + return new_schema.idx; +} + +int session_exdata_set(struct session *sess, int idx, void *ex_ptr) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt == NULL)return -1; + if(plug_mgr_rt->plug_mgr->session_exdata_schema_array == NULL)return -1; + unsigned int len=utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array); + if(len < (unsigned int)idx)return -1; + if(plug_mgr_rt->plugin_exdata_array==NULL)return -1; + (plug_mgr_rt->plugin_exdata_array+idx)->exdata=ex_ptr; + return 0; +} + +void *session_exdata_get(struct session *sess, int idx) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt == NULL)return NULL; + if(plug_mgr_rt->plug_mgr->session_exdata_schema_array==NULL)return NULL; + unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array); + if(len < (unsigned int)idx)return NULL; + return (plug_mgr_rt->plugin_exdata_array+idx)->exdata; +} + +/******************************* + * SESSION MQ * + *******************************/ + + + +static void session_mq_topic_schema_copy(void *_dst, const void *_src) +{ + struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst, + *src = (struct session_mq_topic_schema *)_src; + dst->subscribers = src->subscribers; + dst->free_cb = src->free_cb; + dst->free_cb_arg = src->free_cb_arg; + dst->topic_id = src->topic_id; + dst->subscriber_cnt = src->subscriber_cnt; + dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; +} + +static void session_mq_topic_schema_dtor(void *_elt) +{ + struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt; + if (elt->topic_name) + FREE(elt->topic_name); + // FREE(elt); // free the item +} + +UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor}; + +void session_mq_free(struct session_message *head) +{ + struct session_message *elt, *tmp; + DL_FOREACH_SAFE(head, elt, tmp) + { + DL_DELETE(head, elt); + FREE(elt); + } + FREE(head); +} + +int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);; + if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + struct session_mq_topic_schema *t_schema; + for(unsigned int i = 0; i < len; i++) + { + t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i); + if(strcmp(t_schema->topic_name, topic_name) == 0) + { + return i; + } + } + return -1; +} + +int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->session_mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(len < (unsigned int)topic_id)return -1; + struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + if(t_schema == NULL)return -1; + t_schema->free_cb=msg_free_cb; + t_schema->free_cb_arg=msg_free_arg; + return 0; +} + +int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->session_mq_schema_array == NULL) + { + utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd); + } + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(stellar_session_mq_get_topic_id(st, topic_name) >= 0) + { + return -1; + } + struct session_mq_topic_schema t_schema; + memset(&t_schema, 0, sizeof(struct session_mq_topic_schema)); + t_schema.free_cb=msg_free_cb; + t_schema.topic_name=(char *)topic_name; + t_schema.topic_id=len;//topid_id equals arrary index + t_schema.free_cb_arg=msg_free_arg; + t_schema.subscribers=NULL; + t_schema.subscriber_cnt=0; + utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema); + plug_mgr->topic_num+=1; + return t_schema.topic_id; +} + +int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->session_mq_schema_array==NULL)return 0; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if (len <= (unsigned int)topic_id) + return -1; + struct session_mq_topic_schema *topic = + (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + struct session_mq_subscriber *sub_elt, *sub_tmp; + + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + DL_DELETE(topic->subscribers, sub_elt); + FREE(sub_elt); + } + } + return 0; // success +} + +int session_mq_publish_message(struct session *sess, int topic_id, void *data) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL || topic_id < 0)return -1; + if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1; + unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + struct session_message *msg= ALLOC(struct session_message,1); + msg->topic_id = topic_id; + msg->msg_data = data; + DL_APPEND(plug_mgr_rt->pending_mq, msg); + return 0; +} + +static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) +{ + if(bit_value!=0 && bit_value!=1)return -1; + if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin + if(topic_id < 0 || plugin_id < 0)return -1; + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL)return -1; + if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range + struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id); + if(session_plugin_schema==NULL)return -1; + + unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info); + if(plug_mgr_rt->session_mq_status) + { + for(unsigned int i=0; i < plugin_subscriber_num; i++) + { + struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); + if(topic_id==session_plugin_sub_info->topic_id) + { + bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value); + } + } + return 0; + } + return -1; +} + + +int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id) +{ + return session_mq_set_message_status(sess, topic_id, plugin_id, 0); + +} + +int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id) +{ + return session_mq_set_message_status(sess, topic_id, plugin_id, 1); +} + +UT_icd session_mq_subscriber_info_icd = {sizeof(struct session_mq_subscriber_info), NULL, NULL, NULL}; + +int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id) +{ + if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->session_mq_schema_array==NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + + struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id); + if(session_plugin_schema==NULL)return -1; + + struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + if(session_plugin_schema->registed_session_mq_subscriber_info==NULL) + { + utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd); + } + + // if plugin already subscribe current topic, return 0 + struct session_mq_subscriber_info *p=NULL; + while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + return 0; + }; + + struct session_mq_subscriber *new_subscriber = ALLOC(struct session_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->session_plugin_id = plugin_id; + new_subscriber->msg_cb = plugin_on_msg_cb; + DL_APPEND(topic->subscribers, new_subscriber); + + struct session_mq_subscriber_info sub_info; + memset(&sub_info, 0, sizeof(struct session_mq_subscriber_info)); + sub_info.topic_id=topic_id; + sub_info.subscriber_idx=topic->subscriber_cnt; + utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info); + topic->subscriber_cnt+=1; + plug_mgr->subscriber_num+=1; + return 0; +} + +static void plugin_manager_session_message_dispatch(struct session *sess) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL)return; + + struct session_message *mq_elt=NULL, *mq_tmp=NULL; + struct session_mq_subscriber *sub_elt, *sub_tmp; + struct session_mq_topic_schema *topic; + struct registered_session_plugin_schema *session_plugin_schema; + struct session_plugin_ctx_runtime *plugin_ctx_rt; + while (plug_mgr_rt->pending_mq != NULL) + { + DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) + { + topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, + (unsigned int)(mq_elt->topic_id)); + if (topic) + { + int cur_sub_idx = 0; + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0) + { + plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id); + if(plugin_ctx_rt->state==INIT) + { + if(session_plugin_schema->on_ctx_new) + { + plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); + plugin_ctx_rt->state=ACTIVE; + } + } + if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + } + cur_sub_idx++; + } + if (topic->free_cb) + { + topic->free_cb(mq_elt->msg_data, topic->free_cb_arg); + } + } + DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); + DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list + } + } + return; +} + +/******************************* + * PLUGIN MANAGER SESSION RUNTIME * + *******************************/ + +static struct plugin_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr) +{ + struct plugin_exdata *exdata_rt = NULL; + if(plug_mgr->session_exdata_schema_array==NULL)return NULL; + unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array); + if(len > 0) + { + exdata_rt=ALLOC(struct plugin_exdata, len); + } + return exdata_rt; +} + +static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct session *sess, struct plugin_exdata *exdata_rt) +{ + if(exdata_rt==NULL)return; + if(plug_mgr->session_exdata_schema_array==NULL)return; + unsigned int len=utarray_len(plug_mgr->session_exdata_schema_array); + for (unsigned int i = 0; i < len; i++) + { + void *exdata = (exdata_rt + i)->exdata; + struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i); + if (exdata) + { + if (schema->free_func) + { + schema->free_func(sess, i, exdata, schema->free_arg); + } + } + } +} + +struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess) +{ + if(plug_mgr->registered_session_plugin_array==NULL)return NULL; + struct plugin_manager_runtime *rt = ALLOC(struct plugin_manager_runtime, 1); + rt->plug_mgr = plug_mgr; + rt->sess = sess; + rt->pending_mq = NULL; + rt->delivered_mq = NULL; + rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1); + rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr); + rt->plugin_ctx_array = ALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); + return rt; + +} + +void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) +{ + if(rt==NULL)return; + if(rt->pending_mq != NULL) + { + session_mq_free(rt->pending_mq); + rt->pending_mq=NULL; + } + if(rt->delivered_mq != NULL) + { + session_mq_free(rt->delivered_mq); + rt->delivered_mq=NULL; + } + if(rt->session_mq_status != NULL) + { + bitmap_free(rt->session_mq_status); + } + unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array); + for(unsigned int i = 0; i < len; i++) + { + struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i); + struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i); + if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE) + { + session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + } + } + FREE(rt->plugin_ctx_array); + + session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array); + FREE(rt->plugin_exdata_array); + FREE(rt); +} + + +/********************************************* + * PLUGIN MANAGER PACKET PLUGIN * + *********************************************/ + + +UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; + +int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->registered_packet_plugin_array == NULL) + { + utarray_new(plug_mgr->registered_packet_plugin_array, ®istered_packet_plugin_array_icd); + } + struct registered_packet_plugin_schema packet_plugin_schema; + memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); + packet_plugin_schema.ip_protocol = ip_protocol; + packet_plugin_schema.on_packet = on_packet; + packet_plugin_schema.plugin_env = plugin_env; + utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); + return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id +} + +void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + if(plug_mgr == NULL || pkt == NULL)return; + if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; + struct registered_packet_plugin_schema *p=NULL; + //unsigned char ip_proto=packet_get_layers(pkt); // FIXME get ip_proto + unsigned char ip_proto=0; + while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) + { + if(p->ip_protocol == ip_proto && p->on_packet) + { + p->on_packet(pkt, ip_proto, p->plugin_env); + } + } + return; +} + +/********************************************* + * PLUGIN MANAGER POLLING PLUGIN * + *********************************************/ + + +UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; + +int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->registered_polling_plugin_array == NULL) + { + utarray_new(plug_mgr->registered_polling_plugin_array, ®istered_polling_plugin_array_icd); + } + struct registered_polling_plugin_schema polling_plugin_schema; + memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema)); + polling_plugin_schema.on_polling = on_polling; + polling_plugin_schema.plugin_env = plugin_env; + utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema); + return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array));// return polling plugin_id +} + +int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr->registered_polling_plugin_array == NULL)return 0; + struct registered_polling_plugin_schema *p=NULL; + int polling_state=0; + while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p))) + { + if(p->on_polling) + { + if(p->on_polling(p->plugin_env)==1) + { + polling_state=1; + } + } + } + return polling_state; +} + +/********************************************* + * PLUGIN MANAGER SESSION PLUGIN * + *********************************************/ + + + +UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL}; + + +int stellar_session_plugin_register(struct stellar *st, + session_ctx_new_func session_ctx_new, + session_ctx_free_func session_ctx_free, + void *plugin_env) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->registered_session_plugin_array == NULL) + { + utarray_new(plug_mgr->registered_session_plugin_array, ®istered_session_plugin_schema_icd); + } + struct registered_session_plugin_schema session_plugin_schema; + memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema)); + session_plugin_schema.on_ctx_new = session_ctx_new; + session_plugin_schema.on_ctx_free = session_ctx_free; + session_plugin_schema.plugin_env = plugin_env; + utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema); + return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index +} + +void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt) +{ + if(sess == NULL || pkt == NULL)return; + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL)return; + plugin_manager_scratch_session_set(sess); +#if 0 + switch (packet_get_type(pkt)) + { + case TCP: + topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id; + break; + case TCP_STREAM: + topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id; + break; + case UDP: + topic_id=plug_mgr_rt->plug_mgr->udp_topic_id; + break; + case CONTROL: + topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id; + break; + default: + break; + } +#endif + struct tcp_segment *seg; enum session_type type = session_get_type(sess); - PLUGIN_MANAGER_LOG_DEBUG("=> thread [%d] plugin dispatch session: %u %s %s %s", stellar_get_current_thread_index(), session_get_id(sess), session_get_tuple6_str(sess), session_type_to_str(type), session_state_to_str(state)); - // session_print(sess); if (packet_is_ctrl(pkt)) { - // trigger ctrl msg with (mgr, sess, pkt) - // dispatch_plugin() + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt); } else { switch (type) { case SESSION_TYPE_TCP: - // trigger TCP msg with (mgr, sess, pkt) - // dispatch_plugin() - while ((seg = session_get_tcp_segment(sess)) != NULL) + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt); + if((seg = session_get_tcp_segment(sess)) != NULL) { - // trigger TCP stream msg with (mgr, sess, seg->data, seg->len) - // dispatch_plugin() - session_free_tcp_segment(sess, seg); + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg); + //session_free_tcp_segment(sess, seg); } break; case SESSION_TYPE_UDP: - // trigger UDP msg with (mgr, sess, pkt) - // dispatch_plugin() + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt); break; default: assert(0); break; } } + //TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead + plugin_manager_session_message_dispatch(sess); + plugin_manager_scratch_session_set(NULL); + return; } -void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt) +void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt) { - // TODO -} \ No newline at end of file + if(sess == NULL || pkt == NULL)return; + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL)return; + plugin_manager_scratch_session_set(sess); + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); + plugin_manager_session_message_dispatch(sess); + session_mq_free(plug_mgr_rt->delivered_mq); + plug_mgr_rt->delivered_mq=NULL; + plugin_manager_scratch_session_set(NULL); + return; +} + +void stellar_session_plugin_dettach_current_session(struct session *sess) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id); + if(session_plugin_schema==NULL)return; + + unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info); + //TODO: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch + if(plug_mgr_rt->session_mq_status) + { + for(unsigned int i=0; i < plugin_subscriber_num; i++) + { + struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); + bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0); + } + } + + if(session_plugin_schema->on_ctx_free) + { + session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env); + } + (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL; + (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT; + return; +} + diff --git a/src/plugin/plugin_manager.h b/src/plugin/plugin_manager.h index 18eb038..ae7577d 100644 --- a/src/plugin/plugin_manager.h +++ b/src/plugin/plugin_manager.h @@ -11,16 +11,23 @@ extern "C" #define PLUGIN_MANAGER_LOG_ERROR(format, ...) LOG_ERROR("plugin manager", format, ##__VA_ARGS__) #define PLUGIN_MANAGER_LOG_DEBUG(format, ...) LOG_DEBUG("plugin manager", format, ##__VA_ARGS__) -// per session context -void *plugin_manager_new_ctx(struct session *sess); -void plugin_manager_free_ctx(void *ctx); +struct plugin_manager_schema; +struct plugin_manager_runtime; -struct plugin_manager; -struct plugin_manager *plugin_manager_new(void); -void plugin_manager_free(struct plugin_manager *mgr); +struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path); +void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); -void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt); -void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt); +void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt); + +//return polling work state, 0: idle, 1: working +int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); + +//publish and dispatch session msg(msg, pkt) on session_mq +void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt); +void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt); + +struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); +void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); #ifdef __cplusplus } diff --git a/src/session/session.cpp b/src/session/session.cpp index d9a3612..3139624 100644 --- a/src/session/session.cpp +++ b/src/session/session.cpp @@ -1,5 +1,7 @@ +#include "stellar/session.h" #include +#include "packet_priv.h" #include "session_priv.h" #include "tcp_utils.h" #include "tcp_reassembly.h" @@ -65,6 +67,11 @@ const char *session_get_tuple6_str(const struct session *sess) return sess->tuple_str; } +const char *session_get0_readable_addr(struct session *sess) +{ + return sess->tuple_str; +} + void session_set_direction(struct session *sess, enum session_direction dir) { sess->sess_dir = dir; @@ -95,6 +102,11 @@ enum session_state session_get_state(const struct session *sess) return sess->state; } +enum session_state session_get_current_state(const struct session *sess) +{ + return sess->state; +} + void session_set_type(struct session *sess, enum session_type type) { sess->type = type; @@ -195,6 +207,49 @@ const struct packet *session_get_current_packet(const struct session *sess) return sess->curr_pkt; } +const inline struct packet *session_get0_current_packet(struct session *sess) +{ + return sess->curr_pkt; +} + +const char *session_get0_current_payload(struct session *sess, size_t *payload_len) +{ + const struct packet *pkt=session_get_current_packet(sess); + if(pkt) + { + const struct packet_layer *pkt_layer=packet_get_innermost_layer(pkt, LAYER_TYPE_ALL); + if(pkt_layer) + { + *payload_len=pkt_layer->pld_len; + return pkt_layer->pld_ptr; + } + } + *payload_len=0; + return NULL; +} + +int session_is_symmetric(struct session *sess, unsigned char *flag) +{ + int is_symmetric=0; + if (sess->first_pkt[FLOW_DIRECTION_C2S] && sess->first_pkt[FLOW_DIRECTION_S2C]) + { + if (flag) + *flag = (SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW); + is_symmetric = 1; + } + else if (sess->first_pkt[FLOW_DIRECTION_C2S]) + { + if (flag) + *flag = SESSION_SEEN_C2S_FLOW; + } + else if (sess->first_pkt[FLOW_DIRECTION_S2C]) + { + if (flag) + *flag = SESSION_SEEN_S2C_FLOW; + } + return is_symmetric; +} + void session_set_user_data(struct session *sess, void *user_data) { sess->user_data = user_data; diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index 2eecfb3..e7153c9 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -2,5 +2,7 @@ add_library(core config.cpp stat.cpp stellar.cpp inject.cpp) target_link_libraries(core times plugin_manager session_manager ip_reassembly packet_io pthread fieldstat4 toml) add_executable(stellar main.cpp) -target_link_libraries(stellar core plugin_manager) -install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) \ No newline at end of file +target_link_libraries(stellar core) +target_link_libraries(stellar "-rdynamic") +install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) + diff --git a/src/stellar/main.cpp b/src/stellar/main.cpp index 9083411..5f761a3 100644 --- a/src/stellar/main.cpp +++ b/src/stellar/main.cpp @@ -76,6 +76,7 @@ static int all_stat_have_output(void) int main(int argc, char **argv) { + struct stellar st = {runtime}; stellar_update_time_cache(); signal(SIGINT, signal_handler); @@ -110,8 +111,7 @@ int main(int argc, char **argv) STELLAR_LOG_ERROR("unable to create stellar stat"); goto error_out; } - - runtime->plug_mgr = plugin_manager_new(); + runtime->plug_mgr = plugin_manager_init(&st, "./stellar_plugin/spec.toml"); if (runtime->plug_mgr == NULL) { STELLAR_LOG_ERROR("unable to create plugin manager"); @@ -160,7 +160,7 @@ error_out: stellar_thread_join(runtime, config); stellar_thread_clean(runtime, config); packet_io_free(runtime->packet_io); - plugin_manager_free(runtime->plug_mgr); + plugin_manager_exit(runtime->plug_mgr); stellar_stat_free(runtime->stat); STELLAR_LOG_STATE("stellar exit !!!\n"); log_free(); diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index 51e93b7..26b7c62 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -51,7 +51,7 @@ static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint6 if (sess) { plugin_ctx = session_get_user_data(sess); - plugin_manager_free_ctx(plugin_ctx); + plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx); session_manager_free_session(sess_mgr, sess); } else @@ -71,7 +71,7 @@ static inline void free_expired_sessions(struct session_manager *sess_mgr, uint6 if (sess) { plugin_ctx = session_get_user_data(sess); - plugin_manager_free_ctx(plugin_ctx); + plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx); session_manager_free_session(sess_mgr, sess); } else @@ -102,7 +102,7 @@ static void *work_thread(void *arg) struct packet packets[RX_BURST_MAX]; struct session *sess = NULL; struct packet_io *packet_io = runtime->packet_io; - struct plugin_manager *plug_mgr = runtime->plug_mgr; + struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; struct stellar_thread *thread = (struct stellar_thread *)arg; struct ip_reassembly *ip_reass = thread->ip_mgr; struct session_manager *sess_mgr = thread->sess_mgr; @@ -137,7 +137,7 @@ static void *work_thread(void *arg) defraged_pkt = NULL; pkt = &packets[i]; - plugin_manager_dispatch_packet(plug_mgr, pkt); + plugin_manager_on_packet(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now); @@ -148,7 +148,7 @@ static void *work_thread(void *arg) else { pkt = defraged_pkt; - plugin_manager_dispatch_packet(plug_mgr, pkt); + plugin_manager_on_packet(plug_mgr, pkt); } } @@ -160,7 +160,7 @@ static void *work_thread(void *arg) { goto fast_path; } - plugin_ctx = plugin_manager_new_ctx(sess); + plugin_ctx = plugin_manager_session_runtime_new(runtime->plug_mgr, sess); session_set_user_data(sess, plugin_ctx); } else @@ -174,9 +174,10 @@ static void *work_thread(void *arg) { packet_set_session_id(pkt, session_get_id(sess)); } - plugin_manager_dispatch_session(plug_mgr, sess, pkt); + plugin_manager_on_session_ingress(sess, pkt); fast_path: + plugin_manager_on_session_egress(sess, pkt); update_session_stat(sess, pkt); if (packet_get_action(pkt) == PACKET_ACTION_DROP) { @@ -219,7 +220,8 @@ static void *work_thread(void *arg) ip_reassembly_expire(ip_reass, now); // TODO - // plugin_manager_cron(); + plugin_manager_on_polling(runtime->plug_mgr); + // session_manager_cron(); // poll_non_packet_events(); if (nr_recv == 0) diff --git a/src/stellar/stellar_priv.h b/src/stellar/stellar_priv.h index 8a2f9b8..3f6faf4 100644 --- a/src/stellar/stellar_priv.h +++ b/src/stellar/stellar_priv.h @@ -30,10 +30,16 @@ struct stellar_runtime uint64_t stat_last_output_ts; struct stellar_stat *stat; struct packet_io *packet_io; - struct plugin_manager *plug_mgr; + struct plugin_manager_schema *plug_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; }; +//FIXME rename stellar_runtime to stellar +struct stellar +{ + struct stellar_runtime *st_rt; +}; + extern struct stellar_runtime *runtime; extern struct stellar_config *config;