feat(plugin manager): integrated plugin manager, build success

This commit is contained in:
yangwei
2024-05-17 16:55:46 +08:00
committed by luwenpeng
parent db611561f6
commit e0ec3f2d52
17 changed files with 1201 additions and 62 deletions

1
deps/CMakeLists.txt vendored
View File

@@ -3,3 +3,4 @@ add_subdirectory(dablooms)
add_subdirectory(toml)
add_subdirectory(rbtree)
add_subdirectory(interval_tree)
add_subdirectory(bitmap)

3
deps/bitmap/CMakeLists.txt vendored Normal file
View File

@@ -0,0 +1,3 @@
set(CMAKE_C_FLAGS "-std=c99")
add_definitions(-fPIC)
add_library(bitmap STATIC bitmap.c)

59
deps/bitmap/bitmap.c vendored Normal file
View File

@@ -0,0 +1,59 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct bitmap {
unsigned char *data;
int width;
int height;
};
struct bitmap * bitmap_new(int width, int height, int value) {
struct bitmap *bmp = (struct bitmap *)malloc(sizeof(struct bitmap));
bmp->width = width;
bmp->height = height;
int size = (width * height + 7) / 8; // Calculate total bytes needed
bmp->data = (unsigned char *)calloc(size,1 );
memset(bmp->data, value ? 0xFF : 0x00, size);
return bmp;
}
int bitmap_set(struct bitmap *bmp, int x, int y, int value) {
if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
return -1; // Return error code if coordinates are out of bounds
}
int idx = y * bmp->width + x;
if (value)
bmp->data[idx / 8] |= (1 << (idx % 8));
else
bmp->data[idx / 8] &= ~(1 << (idx % 8));
return 0;
}
int bitmap_get(struct bitmap *bmp, int x, int y) {
if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
return -1; // Return error code if coordinates are out of bounds
}
int idx = y * bmp->width + x;
return (bmp->data[idx / 8] & (1 << (idx % 8))) != 0;
}
void bitmap_free(struct bitmap *bmp) {
if(bmp)
{
if(bmp->data)free(bmp->data);
free(bmp);
}
}
int test_bitmap() {
struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap
if (bitmap_set(bmp, 2, 2, 1) == 0) { // Set bit at position (2,2)
printf("Bit at (2,2): %d\n", bitmap_get(bmp, 2, 2)); // Get bit at position (2,2)
}
bitmap_free(bmp);
return 0;
}

5
deps/bitmap/bitmap.h vendored Normal file
View File

@@ -0,0 +1,5 @@
struct bitmap;
struct bitmap * bitmap_new(int width, int height, int value);
int bitmap_set(struct bitmap *bmp, int x, int y, int value);
int bitmap_get(struct bitmap *bmp, int x, int y);
void bitmap_free(struct bitmap *bmp);

View File

@@ -123,10 +123,18 @@ enum session_timestamp
};
struct session;
#define SESSION_SEEN_C2S_FLOW (1 << 0)
#define SESSION_SEEN_S2C_FLOW (1 << 1)
int session_is_symmetric(struct session *sess, unsigned char *flag);
int session_has_duplicate_traffic(const struct session *sess);
enum session_type session_get_type(const struct session *sess);
enum session_state session_get_state(const struct session *sess);
enum session_state session_get_current_state(const struct session *sess);
const struct packet *session_get0_current_packet(struct session *sess);
const char *session_get0_current_payload(struct session *sess, size_t *payload_len);
enum closing_reason session_get_closing_reason(const struct session *sess);
enum session_direction session_get_direction(const struct session *sess);
enum flow_direction session_get_current_flow_direction(const struct session *sess);
@@ -141,6 +149,8 @@ uint64_t session_get_id(const struct session *sess);
uint64_t session_get_timestamp(const struct session *sess, enum session_timestamp type);
uint64_t session_get_stat(const struct session *sess, enum flow_direction dir, enum session_stat stat);
const char *session_get0_readable_addr(struct session *sess);
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,17 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar.h"
typedef void session_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg);
int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *arg);
int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
void *session_exdata_get(struct session *sess, int idx);
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,33 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar.h"
//session mq
typedef void msg_free_cb_func(void *msg, void *msg_free_arg);
typedef void on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
//return topic_id
int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name);
int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id);
//return 0 if success, otherwise return -1.
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id);
int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
#ifdef __cplusplus
}
#endif

View File

@@ -9,6 +9,48 @@ extern "C"
#include <stdint.h>
#include "stellar/session.h"
#include "session.h"
struct session;
struct stellar;
//return plugin_env
typedef void *plugin_on_load_func(struct stellar *st);
typedef void plugin_on_unload_func(void *plugin_env);
//return per_session_ctx
typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
#define TOPIC_TCP "TCP"
#define TOPIC_TCP_STREAM "TCP_STREAM"
#define TOPIC_UDP "UDP"
#define TOPIC_EGRESS "EGRESS"
#define TOPIC_CONTROL_PACKET "CONTROL_PACKET"
//return session plugin_id
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
void *plugin_env);
void stellar_session_plugin_dettach_current_session(struct session *sess);
struct packet;
typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
//return packet plugin_id
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env);
//return polling work result, 0: no work, 1: work
typedef int plugin_on_polling_func(void *plugin_env);
//return polling plugin_id
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env);
uint16_t stellar_get_current_thread_index();
// return inject packet length, return 0 if failed

36
include/stellar/utils.h Normal file
View File

@@ -0,0 +1,36 @@
#pragma once
#include <stdlib.h> //calloc
#include <stddef.h> //NULL
#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
#define REALLOC(type, ptr, number) ((type *)realloc(ptr, (number) * sizeof(type)))
#define FREE(p) {free(p); p = NULL;}
#define TRUE 1
#define FALSE 0
#ifndef MAX
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
#endif
#ifndef MIN
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
#ifndef offsetof
#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
#endif
#ifndef container_of
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
#endif
#ifndef __unused
#define __unused __attribute__((__unused__))
#endif

View File

@@ -1,3 +1,4 @@
add_library(plugin_manager plugin_manager.cpp)
target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(plugin_manager session_manager core)
target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
target_link_libraries(plugin_manager bitmap toml session_manager core ${CMAKE_DL_LIBS})

View File

@@ -3,80 +3,940 @@
#include "session_priv.h"
#include "stellar_priv.h"
struct plugin_manager
#include "plugin_manager.h"
#include "stellar/utils.h"
#include "stellar/session.h"
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
#include "tcp_reassembly.h"
extern "C"
{
// TODO
#include "uthash/utlist.h"
#include "uthash/utarray.h"
#include "bitmap/bitmap.h"
}
struct plugin_manager_schema
{
struct stellar *st;
UT_array *session_exdata_schema_array;
UT_array *plugin_load_specs_array;
UT_array *session_mq_schema_array;
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
int topic_num;
int subscriber_num;
int tcp_topic_id;
int udp_topic_id;
int tcp_stream_topic_id;
int egress_topic_id;
int control_packet_topic_id;
};
void *plugin_manager_new_ctx(struct session *sess)
struct session_exdata_schema
{
// TODO
return sess;
char *name;
session_exdata_free *free_func;
void *free_arg;
int idx;
};
struct session_message
{
int topic_id;
void *msg_data;
struct session_message *next, *prev;
};
typedef struct session_mq_subscriber
{
int topic_subscriber_idx;
int session_plugin_id;
on_msg_cb_func *msg_cb;
struct session_mq_subscriber *next, *prev;
}session_mq_subscribers;
struct session_mq_topic_schema
{
char *topic_name;
msg_free_cb_func *free_cb;
void *free_cb_arg;
int topic_id;
int subscriber_cnt;
struct session_mq_subscriber *subscribers;
};
enum plugin_ctx_state
{ INIT, ACTIVE, EXIT };
struct session_plugin_ctx_runtime
{
enum plugin_ctx_state state;
int session_plugin_id;
void *plugin_ctx;
};
struct plugin_exdata
{
void *exdata;
};
struct plugin_manager_runtime
{
struct plugin_manager_schema *plug_mgr;
struct session *sess;
struct session_message *pending_mq;// message list
struct session_message *delivered_mq;// message list
struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
struct plugin_exdata *plugin_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
};
struct registered_packet_plugin_schema
{
char ip_protocol;
plugin_on_packet_func *on_packet;
void *plugin_env;
};
struct registered_polling_plugin_schema
{
plugin_on_polling_func *on_polling;
void *plugin_env;
};
struct session_mq_subscriber_info
{
int topic_id;
int subscriber_idx;
};
struct registered_session_plugin_schema
{
session_ctx_new_func *on_ctx_new;
session_ctx_free_func *on_ctx_free;
void *plugin_env;
UT_array *registed_session_mq_subscriber_info;
};
#define PACKET_PULGIN_ID_BASE 0x10000
#define POLLING_PULGIN_ID_BASE 0x20000
/*******************************
* PLUGIN MANAGER INIT & EXIT *
*******************************/
#include <dlfcn.h>
#include "toml/toml.h"
struct plugin_specific
{
char plugin_name[256];
plugin_on_load_func *load_cb;
plugin_on_unload_func *unload_cb;
void *plugin_ctx;
};
thread_local struct session *per_thread_scratch_sess;
inline static void plugin_manager_scratch_session_set(struct session *sess)
{
per_thread_scratch_sess = sess;
}
void plugin_manager_free_ctx(void *ctx)
inline static struct session *plugin_manager_scratch_session_get()
{
// TODO
// struct session *sess = (struct session *)ctx;
// char buff[4096] = {0};
// session_to_json(sess, buff, sizeof(buff));
// PLUGIN_MANAGER_LOG_DEBUG("=> SESSION : %s", buff);
return per_thread_scratch_sess;
}
struct plugin_manager *plugin_manager_new(void)
inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st)
{
// TODO
static struct plugin_manager mgr;
return &mgr;
return st->st_rt->plug_mgr;
}
void plugin_manager_free(struct plugin_manager *mgr)
inline int stellar_plugin_manager_schema_set(struct stellar *st, struct plugin_manager_schema *pm)
{
// TODO
if(st->st_rt->plug_mgr)return -1;
st->st_rt->plug_mgr=pm;
return 0;
}
void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt)
{
// TODO
// current implementation only for testing
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
*spec_num = 0;
FILE* fp = fopen(toml_conf_path, "r");
if(fp==NULL)return NULL;
char errbuf[256];
toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
fclose(fp);
if (!conf) {
fprintf(stderr, "Error parsing toml: %s\n", errbuf);
return NULL;
}
toml_array_t* plugin_array = toml_array_in(conf, "plugin");
if(plugin_array==NULL)return NULL;
*spec_num = toml_array_nelem(plugin_array);
struct plugin_specific* plugins = ALLOC(struct plugin_specific, *spec_num);
for (int i = 0; i < *spec_num; i++) {
toml_table_t* plugin = toml_table_at(plugin_array, i);
const char *path_raw = toml_raw_in(plugin, "path");
const char *init_func_name_raw = toml_raw_in(plugin, "init");
const char *exit_func_name_raw = toml_raw_in(plugin, "exit");
char *path = NULL;
char *init_func_name = NULL;
char *exit_func_name = NULL;
if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
toml_rtos(exit_func_name_raw, &exit_func_name))
{
goto PLUGIN_SPEC_LOAD_ERROR;
}
void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
if (!handle) {
fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
goto PLUGIN_SPEC_LOAD_ERROR;
}
plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name);
if (!plugins[i].load_cb) {
fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
}
plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name);
if (!plugins[i].unload_cb) {
fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
}
FREE(path);
FREE(init_func_name);
FREE(exit_func_name);
}
toml_free(conf);
return plugins;
PLUGIN_SPEC_LOAD_ERROR:
toml_free(conf);
FREE(plugins);
return NULL;
}
#include "session_priv.h"
static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg)
{
struct session *cur_sess = plugin_manager_scratch_session_get();
if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg);
}
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
if(spec_num < 0)
{
return NULL;
}
struct plugin_manager_schema *pm = ALLOC(struct plugin_manager_schema, 1);
if(spec_num > 0)
{
utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd);
utarray_reserve(pm->plugin_load_specs_array, spec_num);
}
pm->st = st;
stellar_plugin_manager_schema_set(st, pm);
pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
{
specs[i].plugin_ctx=specs[i].load_cb(st);
utarray_push_back(pm->plugin_load_specs_array, &specs[i]);
}
}
FREE(specs);
return pm;
}
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
struct plugin_specific *p=NULL;
if (plug_mgr->plugin_load_specs_array)
{
while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p)))
{
if (p->unload_cb)
p->unload_cb(p->plugin_ctx);
}
utarray_free(plug_mgr->plugin_load_specs_array);
}
if(plug_mgr->session_mq_schema_array)
{
for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++)
{
stellar_session_mq_destroy_topic(plug_mgr->st, i);
}
utarray_free(plug_mgr->session_mq_schema_array);
}
if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_session_plugin_array)
{
struct registered_session_plugin_schema *s = NULL;
while ((s = (struct registered_session_plugin_schema *)utarray_next(plug_mgr->registered_session_plugin_array, s)))
{
if(s->registed_session_mq_subscriber_info)utarray_free(s->registed_session_mq_subscriber_info);
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
FREE(plug_mgr);
return;
}
/*******************************
* SESSION EXDATA *
*******************************/
static void session_exdata_met_copy(void *_dst, const void *_src)
{
struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src;
dst->free_func = src->free_func;
dst->free_arg = src->free_arg;
dst->idx = src->idx;
dst->name = src->name ? strdup(src->name) : NULL;
}
static void session_exdata_met_dtor(void *_elt)
{
struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt;
if (elt->name)
FREE(elt->name);
}
UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor};
int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->session_exdata_schema_array == NULL)
{
utarray_new(plug_mgr->session_exdata_schema_array, &session_exdata_meta_icd);
}
if(plug_mgr->session_exdata_schema_array == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
struct session_exdata_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
if(strcmp(t_schema->name, name) == 0)
{
return t_schema->idx;
}
}
struct session_exdata_schema new_schema;
memset(&new_schema, 0, sizeof(struct session_exdata_schema));
new_schema.free_func=free_func;
new_schema.name=(char *)name;
new_schema.idx=len;
new_schema.free_arg=free_arg;
utarray_push_back(plug_mgr->session_exdata_schema_array, &new_schema);
return new_schema.idx;
}
int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt == NULL)return -1;
if(plug_mgr_rt->plug_mgr->session_exdata_schema_array == NULL)return -1;
unsigned int len=utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
if(len < (unsigned int)idx)return -1;
if(plug_mgr_rt->plugin_exdata_array==NULL)return -1;
(plug_mgr_rt->plugin_exdata_array+idx)->exdata=ex_ptr;
return 0;
}
void *session_exdata_get(struct session *sess, int idx)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt == NULL)return NULL;
if(plug_mgr_rt->plug_mgr->session_exdata_schema_array==NULL)return NULL;
unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
if(len < (unsigned int)idx)return NULL;
return (plug_mgr_rt->plugin_exdata_array+idx)->exdata;
}
/*******************************
* SESSION MQ *
*******************************/
static void session_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst,
*src = (struct session_mq_topic_schema *)_src;
dst->subscribers = src->subscribers;
dst->free_cb = src->free_cb;
dst->free_cb_arg = src->free_cb_arg;
dst->topic_id = src->topic_id;
dst->subscriber_cnt = src->subscriber_cnt;
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
static void session_mq_topic_schema_dtor(void *_elt)
{
struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt;
if (elt->topic_name)
FREE(elt->topic_name);
// FREE(elt); // free the item
}
UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor};
void session_mq_free(struct session_message *head)
{
struct session_message *elt, *tmp;
DL_FOREACH_SAFE(head, elt, tmp)
{
DL_DELETE(head, elt);
FREE(elt);
}
FREE(head);
}
int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);;
if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
struct session_mq_topic_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
}
}
return -1;
}
int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->session_mq_schema_array == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
if(len < (unsigned int)topic_id)return -1;
struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->session_mq_schema_array == NULL)
{
utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd);
}
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
if(stellar_session_mq_get_topic_id(st, topic_name) >= 0)
{
return -1;
}
struct session_mq_topic_schema t_schema;
memset(&t_schema, 0, sizeof(struct session_mq_topic_schema));
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema);
plug_mgr->topic_num+=1;
return t_schema.topic_id;
}
int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->session_mq_schema_array==NULL)return 0;
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
struct session_mq_topic_schema *topic =
(struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
struct session_mq_subscriber *sub_elt, *sub_tmp;
if (topic)
{
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
DL_DELETE(topic->subscribers, sub_elt);
FREE(sub_elt);
}
}
return 0; // success
}
int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL || topic_id < 0)return -1;
if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct session_message *msg= ALLOC(struct session_message,1);
msg->topic_id = topic_id;
msg->msg_data = data;
DL_APPEND(plug_mgr_rt->pending_mq, msg);
return 0;
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
if(bit_value!=0 && bit_value!=1)return -1;
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
if(topic_id < 0 || plugin_id < 0)return -1;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return -1;
if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range
struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
if(session_plugin_schema==NULL)return -1;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
if(plug_mgr_rt->session_mq_status)
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
}
}
return 0;
}
return -1;
}
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
}
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
}
UT_icd session_mq_subscriber_info_icd = {sizeof(struct session_mq_subscriber_info), NULL, NULL, NULL};
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->session_mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
if(session_plugin_schema==NULL)return -1;
struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
{
utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd);
}
// if plugin already subscribe current topic, return 0
struct session_mq_subscriber_info *p=NULL;
while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
{
if(p->topic_id==topic_id)
return 0;
};
struct session_mq_subscriber *new_subscriber = ALLOC(struct session_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->session_plugin_id = plugin_id;
new_subscriber->msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
struct session_mq_subscriber_info sub_info;
memset(&sub_info, 0, sizeof(struct session_mq_subscriber_info));
sub_info.topic_id=topic_id;
sub_info.subscriber_idx=topic->subscriber_cnt;
utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info);
topic->subscriber_cnt+=1;
plug_mgr->subscriber_num+=1;
return 0;
}
static void plugin_manager_session_message_dispatch(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
struct session_message *mq_elt=NULL, *mq_tmp=NULL;
struct session_mq_subscriber *sub_elt, *sub_tmp;
struct session_mq_topic_schema *topic;
struct registered_session_plugin_schema *session_plugin_schema;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
while (plug_mgr_rt->pending_mq != NULL)
{
DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
{
topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
(unsigned int)(mq_elt->topic_id));
if (topic)
{
int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id);
if(plugin_ctx_rt->state==INIT)
{
if(session_plugin_schema->on_ctx_new)
{
plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
plugin_ctx_rt->state=ACTIVE;
}
}
if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
session_plugin_schema->plugin_env);
}
cur_sub_idx++;
}
if (topic->free_cb)
{
topic->free_cb(mq_elt->msg_data, topic->free_cb_arg);
}
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
}
}
return;
}
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
static struct plugin_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
{
struct plugin_exdata *exdata_rt = NULL;
if(plug_mgr->session_exdata_schema_array==NULL)return NULL;
unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
if(len > 0)
{
exdata_rt=ALLOC(struct plugin_exdata, len);
}
return exdata_rt;
}
static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct session *sess, struct plugin_exdata *exdata_rt)
{
if(exdata_rt==NULL)return;
if(plug_mgr->session_exdata_schema_array==NULL)return;
unsigned int len=utarray_len(plug_mgr->session_exdata_schema_array);
for (unsigned int i = 0; i < len; i++)
{
void *exdata = (exdata_rt + i)->exdata;
struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
if (exdata)
{
if (schema->free_func)
{
schema->free_func(sess, i, exdata, schema->free_arg);
}
}
}
}
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
{
if(plug_mgr->registered_session_plugin_array==NULL)return NULL;
struct plugin_manager_runtime *rt = ALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1);
rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr);
rt->plugin_ctx_array = ALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
return rt;
}
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
if(rt->pending_mq != NULL)
{
session_mq_free(rt->pending_mq);
rt->pending_mq=NULL;
}
if(rt->delivered_mq != NULL)
{
session_mq_free(rt->delivered_mq);
rt->delivered_mq=NULL;
}
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
}
unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
for(unsigned int i = 0; i < len; i++)
{
struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE)
{
session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}
FREE(rt->plugin_ctx_array);
session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array);
FREE(rt->plugin_exdata_array);
FREE(rt);
}
/*********************************************
* PLUGIN MANAGER PACKET PLUGIN *
*********************************************/
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->registered_packet_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_packet_plugin_array, &registered_packet_plugin_array_icd);
}
struct registered_packet_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
packet_plugin_schema.ip_protocol = ip_protocol;
packet_plugin_schema.on_packet = on_packet;
packet_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id
}
void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr == NULL || pkt == NULL)return;
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
//unsigned char ip_proto=packet_get_layers(pkt); // FIXME get ip_proto
unsigned char ip_proto=0;
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
{
p->on_packet(pkt, ip_proto, p->plugin_env);
}
}
return;
}
/*********************************************
* PLUGIN MANAGER POLLING PLUGIN *
*********************************************/
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->registered_polling_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_polling_plugin_array, &registered_polling_plugin_array_icd);
}
struct registered_polling_plugin_schema polling_plugin_schema;
memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema));
polling_plugin_schema.on_polling = on_polling;
polling_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array));// return polling plugin_id
}
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr->registered_polling_plugin_array == NULL)return 0;
struct registered_polling_plugin_schema *p=NULL;
int polling_state=0;
while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
{
if(p->on_polling)
{
if(p->on_polling(p->plugin_env)==1)
{
polling_state=1;
}
}
}
return polling_state;
}
/*********************************************
* PLUGIN MANAGER SESSION PLUGIN *
*********************************************/
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->registered_session_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_session_plugin_array, &registered_session_plugin_schema_icd);
}
struct registered_session_plugin_schema session_plugin_schema;
memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
session_plugin_schema.on_ctx_new = session_ctx_new;
session_plugin_schema.on_ctx_free = session_ctx_free;
session_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
}
void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt)
{
if(sess == NULL || pkt == NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
plugin_manager_scratch_session_set(sess);
#if 0
switch (packet_get_type(pkt))
{
case TCP:
topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id;
break;
case TCP_STREAM:
topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id;
break;
case UDP:
topic_id=plug_mgr_rt->plug_mgr->udp_topic_id;
break;
case CONTROL:
topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id;
break;
default:
break;
}
#endif
struct tcp_segment *seg;
enum session_state state = session_get_state(sess);
enum session_type type = session_get_type(sess);
PLUGIN_MANAGER_LOG_DEBUG("=> thread [%d] plugin dispatch session: %u %s %s %s", stellar_get_current_thread_index(), session_get_id(sess), session_get_tuple6_str(sess), session_type_to_str(type), session_state_to_str(state));
// session_print(sess);
if (packet_is_ctrl(pkt))
{
// trigger ctrl msg with (mgr, sess, pkt)
// dispatch_plugin()
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt);
}
else
{
switch (type)
{
case SESSION_TYPE_TCP:
// trigger TCP msg with (mgr, sess, pkt)
// dispatch_plugin()
while ((seg = session_get_tcp_segment(sess)) != NULL)
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt);
if((seg = session_get_tcp_segment(sess)) != NULL)
{
// trigger TCP stream msg with (mgr, sess, seg->data, seg->len)
// dispatch_plugin()
session_free_tcp_segment(sess, seg);
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg);
//session_free_tcp_segment(sess, seg);
}
break;
case SESSION_TYPE_UDP:
// trigger UDP msg with (mgr, sess, pkt)
// dispatch_plugin()
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt);
break;
default:
assert(0);
break;
}
}
//TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
plugin_manager_session_message_dispatch(sess);
plugin_manager_scratch_session_set(NULL);
return;
}
void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt)
void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt)
{
// TODO
if(sess == NULL || pkt == NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
plugin_manager_scratch_session_set(sess);
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
session_mq_free(plug_mgr_rt->delivered_mq);
plug_mgr_rt->delivered_mq=NULL;
plugin_manager_scratch_session_set(NULL);
return;
}
void stellar_session_plugin_dettach_current_session(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
if(session_plugin_schema==NULL)return;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
//TODO: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
if(plug_mgr_rt->session_mq_status)
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
}
}
if(session_plugin_schema->on_ctx_free)
{
session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env);
}
(plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL;
(plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT;
return;
}

View File

@@ -11,16 +11,23 @@ extern "C"
#define PLUGIN_MANAGER_LOG_ERROR(format, ...) LOG_ERROR("plugin manager", format, ##__VA_ARGS__)
#define PLUGIN_MANAGER_LOG_DEBUG(format, ...) LOG_DEBUG("plugin manager", format, ##__VA_ARGS__)
// per session context
void *plugin_manager_new_ctx(struct session *sess);
void plugin_manager_free_ctx(void *ctx);
struct plugin_manager_schema;
struct plugin_manager_runtime;
struct plugin_manager;
struct plugin_manager *plugin_manager_new(void);
void plugin_manager_free(struct plugin_manager *mgr);
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt);
void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt);
void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt);
void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt);
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
#ifdef __cplusplus
}

View File

@@ -1,5 +1,7 @@
#include "stellar/session.h"
#include <assert.h>
#include "packet_priv.h"
#include "session_priv.h"
#include "tcp_utils.h"
#include "tcp_reassembly.h"
@@ -65,6 +67,11 @@ const char *session_get_tuple6_str(const struct session *sess)
return sess->tuple_str;
}
const char *session_get0_readable_addr(struct session *sess)
{
return sess->tuple_str;
}
void session_set_direction(struct session *sess, enum session_direction dir)
{
sess->sess_dir = dir;
@@ -95,6 +102,11 @@ enum session_state session_get_state(const struct session *sess)
return sess->state;
}
enum session_state session_get_current_state(const struct session *sess)
{
return sess->state;
}
void session_set_type(struct session *sess, enum session_type type)
{
sess->type = type;
@@ -195,6 +207,49 @@ const struct packet *session_get_current_packet(const struct session *sess)
return sess->curr_pkt;
}
const inline struct packet *session_get0_current_packet(struct session *sess)
{
return sess->curr_pkt;
}
const char *session_get0_current_payload(struct session *sess, size_t *payload_len)
{
const struct packet *pkt=session_get_current_packet(sess);
if(pkt)
{
const struct packet_layer *pkt_layer=packet_get_innermost_layer(pkt, LAYER_TYPE_ALL);
if(pkt_layer)
{
*payload_len=pkt_layer->pld_len;
return pkt_layer->pld_ptr;
}
}
*payload_len=0;
return NULL;
}
int session_is_symmetric(struct session *sess, unsigned char *flag)
{
int is_symmetric=0;
if (sess->first_pkt[FLOW_DIRECTION_C2S] && sess->first_pkt[FLOW_DIRECTION_S2C])
{
if (flag)
*flag = (SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW);
is_symmetric = 1;
}
else if (sess->first_pkt[FLOW_DIRECTION_C2S])
{
if (flag)
*flag = SESSION_SEEN_C2S_FLOW;
}
else if (sess->first_pkt[FLOW_DIRECTION_S2C])
{
if (flag)
*flag = SESSION_SEEN_S2C_FLOW;
}
return is_symmetric;
}
void session_set_user_data(struct session *sess, void *user_data)
{
sess->user_data = user_data;

View File

@@ -2,5 +2,7 @@ add_library(core config.cpp stat.cpp stellar.cpp inject.cpp)
target_link_libraries(core times plugin_manager session_manager ip_reassembly packet_io pthread fieldstat4 toml)
add_executable(stellar main.cpp)
target_link_libraries(stellar core plugin_manager)
target_link_libraries(stellar core)
target_link_libraries(stellar "-rdynamic")
install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)

View File

@@ -76,6 +76,7 @@ static int all_stat_have_output(void)
int main(int argc, char **argv)
{
struct stellar st = {runtime};
stellar_update_time_cache();
signal(SIGINT, signal_handler);
@@ -110,8 +111,7 @@ int main(int argc, char **argv)
STELLAR_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
runtime->plug_mgr = plugin_manager_new();
runtime->plug_mgr = plugin_manager_init(&st, "./stellar_plugin/spec.toml");
if (runtime->plug_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create plugin manager");
@@ -160,7 +160,7 @@ error_out:
stellar_thread_join(runtime, config);
stellar_thread_clean(runtime, config);
packet_io_free(runtime->packet_io);
plugin_manager_free(runtime->plug_mgr);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();

View File

@@ -51,7 +51,7 @@ static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
plugin_manager_free_ctx(plugin_ctx);
plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
else
@@ -71,7 +71,7 @@ static inline void free_expired_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
plugin_manager_free_ctx(plugin_ctx);
plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
else
@@ -102,7 +102,7 @@ static void *work_thread(void *arg)
struct packet packets[RX_BURST_MAX];
struct session *sess = NULL;
struct packet_io *packet_io = runtime->packet_io;
struct plugin_manager *plug_mgr = runtime->plug_mgr;
struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
struct stellar_thread *thread = (struct stellar_thread *)arg;
struct ip_reassembly *ip_reass = thread->ip_mgr;
struct session_manager *sess_mgr = thread->sess_mgr;
@@ -137,7 +137,7 @@ static void *work_thread(void *arg)
defraged_pkt = NULL;
pkt = &packets[i];
plugin_manager_dispatch_packet(plug_mgr, pkt);
plugin_manager_on_packet(plug_mgr, pkt);
if (packet_is_fragment(pkt))
{
defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now);
@@ -148,7 +148,7 @@ static void *work_thread(void *arg)
else
{
pkt = defraged_pkt;
plugin_manager_dispatch_packet(plug_mgr, pkt);
plugin_manager_on_packet(plug_mgr, pkt);
}
}
@@ -160,7 +160,7 @@ static void *work_thread(void *arg)
{
goto fast_path;
}
plugin_ctx = plugin_manager_new_ctx(sess);
plugin_ctx = plugin_manager_session_runtime_new(runtime->plug_mgr, sess);
session_set_user_data(sess, plugin_ctx);
}
else
@@ -174,9 +174,10 @@ static void *work_thread(void *arg)
{
packet_set_session_id(pkt, session_get_id(sess));
}
plugin_manager_dispatch_session(plug_mgr, sess, pkt);
plugin_manager_on_session_ingress(sess, pkt);
fast_path:
plugin_manager_on_session_egress(sess, pkt);
update_session_stat(sess, pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
@@ -219,7 +220,8 @@ static void *work_thread(void *arg)
ip_reassembly_expire(ip_reass, now);
// TODO
// plugin_manager_cron();
plugin_manager_on_polling(runtime->plug_mgr);
// session_manager_cron();
// poll_non_packet_events();
if (nr_recv == 0)

View File

@@ -30,10 +30,16 @@ struct stellar_runtime
uint64_t stat_last_output_ts;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager *plug_mgr;
struct plugin_manager_schema *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
//FIXME rename stellar_runtime to stellar
struct stellar
{
struct stellar_runtime *st_rt;
};
extern struct stellar_runtime *runtime;
extern struct stellar_config *config;