2024-08-06 20:37:59 +08:00
|
|
|
#include "plugin_manager_interna.h"
|
|
|
|
|
#include "stellar/session.h"
|
2024-05-17 16:55:46 +08:00
|
|
|
#include "stellar/utils.h"
|
|
|
|
|
#include "toml/toml.h"
|
2024-08-06 20:37:59 +08:00
|
|
|
#include "uthash/utlist.h"
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
#include "stellar/stellar_core.h"
|
|
|
|
|
#include "session/session_utils.h"
|
|
|
|
|
#include "tuple/tuple.h"
|
|
|
|
|
#include "packet/packet_utils.h"
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
// TODO: set scratch_sess to per_thread_data
|
2024-05-17 16:55:46 +08:00
|
|
|
thread_local struct session *per_thread_scratch_sess;
|
|
|
|
|
|
|
|
|
|
inline static void plugin_manager_scratch_session_set(struct session *sess)
|
|
|
|
|
{
|
|
|
|
|
per_thread_scratch_sess = sess;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline static struct session *plugin_manager_scratch_session_get()
|
|
|
|
|
{
|
|
|
|
|
return per_thread_scratch_sess;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
|
|
|
|
|
{
|
|
|
|
|
*spec_num = 0;
|
|
|
|
|
FILE* fp = fopen(toml_conf_path, "r");
|
|
|
|
|
if(fp==NULL)return NULL;
|
|
|
|
|
char errbuf[256];
|
|
|
|
|
toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
|
|
|
|
|
fclose(fp);
|
|
|
|
|
if (!conf) {
|
|
|
|
|
fprintf(stderr, "Error parsing toml: %s\n", errbuf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
toml_array_t* plugin_array = toml_array_in(conf, "plugin");
|
|
|
|
|
if(plugin_array==NULL)return NULL;
|
|
|
|
|
*spec_num = toml_array_nelem(plugin_array);
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_specific* plugins = CALLOC(struct plugin_specific, *spec_num);
|
2024-05-17 16:55:46 +08:00
|
|
|
|
|
|
|
|
for (int i = 0; i < *spec_num; i++) {
|
|
|
|
|
toml_table_t* plugin = toml_table_at(plugin_array, i);
|
|
|
|
|
|
|
|
|
|
const char *path_raw = toml_raw_in(plugin, "path");
|
|
|
|
|
const char *init_func_name_raw = toml_raw_in(plugin, "init");
|
|
|
|
|
const char *exit_func_name_raw = toml_raw_in(plugin, "exit");
|
|
|
|
|
char *path = NULL;
|
|
|
|
|
char *init_func_name = NULL;
|
|
|
|
|
char *exit_func_name = NULL;
|
|
|
|
|
if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
|
|
|
|
|
toml_rtos(exit_func_name_raw, &exit_func_name))
|
|
|
|
|
{
|
|
|
|
|
goto PLUGIN_SPEC_LOAD_ERROR;
|
|
|
|
|
}
|
|
|
|
|
void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
|
|
|
|
|
if (!handle) {
|
|
|
|
|
fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
|
|
|
|
|
goto PLUGIN_SPEC_LOAD_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name);
|
|
|
|
|
if (!plugins[i].load_cb) {
|
|
|
|
|
fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name);
|
|
|
|
|
if (!plugins[i].unload_cb) {
|
|
|
|
|
fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
|
|
|
|
|
}
|
|
|
|
|
FREE(path);
|
|
|
|
|
FREE(init_func_name);
|
|
|
|
|
FREE(exit_func_name);
|
|
|
|
|
}
|
|
|
|
|
toml_free(conf);
|
|
|
|
|
return plugins;
|
|
|
|
|
PLUGIN_SPEC_LOAD_ERROR:
|
|
|
|
|
toml_free(conf);
|
|
|
|
|
FREE(plugins);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
|
|
|
|
|
{
|
|
|
|
|
if(st == NULL)return NULL;
|
|
|
|
|
int thread_num=stellar_get_worker_thread_num(st);
|
|
|
|
|
struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num);
|
|
|
|
|
return per_thread_data;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
|
|
|
|
|
{
|
|
|
|
|
if(per_thread_data == NULL || st == NULL)return;
|
|
|
|
|
int thread_num=stellar_get_worker_thread_num(st);
|
|
|
|
|
struct plugin_manger_per_thread_data *p_data;
|
|
|
|
|
for (int i = 0; i < thread_num; i++)
|
|
|
|
|
{
|
|
|
|
|
p_data=per_thread_data+i;
|
|
|
|
|
if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
|
|
|
|
|
}
|
|
|
|
|
FREE(per_thread_data);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-07 13:34:56 +08:00
|
|
|
static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg __attribute__((unused)))
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
|
|
|
|
struct session *cur_sess = plugin_manager_scratch_session_get();
|
|
|
|
|
if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
|
|
|
|
|
{
|
|
|
|
|
int spec_num;
|
|
|
|
|
struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
|
|
|
|
|
if(spec_num < 0)
|
|
|
|
|
{
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
|
|
|
|
|
plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH;
|
2024-05-17 16:55:46 +08:00
|
|
|
if(spec_num > 0)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
|
|
|
|
|
utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
plug_mgr->st = st;
|
|
|
|
|
stellar_set_plugin_manger(st, plug_mgr);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
|
|
|
|
|
plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
|
|
|
|
|
plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
|
|
|
|
|
plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
|
|
|
|
|
plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
|
2024-05-17 16:55:46 +08:00
|
|
|
|
|
|
|
|
for(int i = 0; i < spec_num; i++)
|
|
|
|
|
{
|
|
|
|
|
if (specs[i].load_cb != NULL)
|
|
|
|
|
{
|
|
|
|
|
specs[i].plugin_ctx=specs[i].load_cb(st);
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
FREE(specs);
|
2024-08-06 20:37:59 +08:00
|
|
|
plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
|
|
|
|
|
return plug_mgr;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
|
|
|
|
|
{
|
2024-08-07 10:56:58 +08:00
|
|
|
if(plug_mgr==NULL)return;
|
2024-05-17 16:55:46 +08:00
|
|
|
struct plugin_specific *p=NULL;
|
|
|
|
|
if (plug_mgr->plugin_load_specs_array)
|
|
|
|
|
{
|
|
|
|
|
while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p)))
|
|
|
|
|
{
|
|
|
|
|
if (p->unload_cb)
|
|
|
|
|
p->unload_cb(p->plugin_ctx);
|
|
|
|
|
}
|
|
|
|
|
utarray_free(plug_mgr->plugin_load_specs_array);
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_mq_schema_array)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
stellar_mq_destroy_topic( plug_mgr->st, i);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_free(plug_mgr->stellar_mq_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->registered_packet_plugin_array)
|
|
|
|
|
{
|
|
|
|
|
struct registered_packet_plugin_schema *s = NULL;
|
|
|
|
|
while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
|
|
|
|
|
{
|
|
|
|
|
if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info);
|
|
|
|
|
}
|
|
|
|
|
utarray_free(plug_mgr->registered_packet_plugin_array);
|
|
|
|
|
}
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr->registered_session_plugin_array)
|
|
|
|
|
{
|
|
|
|
|
struct registered_session_plugin_schema *s = NULL;
|
|
|
|
|
while ((s = (struct registered_session_plugin_schema *)utarray_next(plug_mgr->registered_session_plugin_array, s)))
|
|
|
|
|
{
|
|
|
|
|
if(s->registed_session_mq_subscriber_info)utarray_free(s->registed_session_mq_subscriber_info);
|
|
|
|
|
}
|
|
|
|
|
utarray_free(plug_mgr->registered_session_plugin_array);
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
|
2024-05-17 16:55:46 +08:00
|
|
|
FREE(plug_mgr);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
2024-08-06 20:37:59 +08:00
|
|
|
* STELLAR EXDATA *
|
2024-05-17 16:55:46 +08:00
|
|
|
*******************************/
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
static void stellar_exdata_met_copy(void *_dst, const void *_src)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_exdata_schema *dst = (struct stellar_exdata_schema *)_dst, *src = (struct stellar_exdata_schema *)_src;
|
2024-05-17 16:55:46 +08:00
|
|
|
dst->free_func = src->free_func;
|
|
|
|
|
dst->free_arg = src->free_arg;
|
|
|
|
|
dst->idx = src->idx;
|
|
|
|
|
dst->name = src->name ? strdup(src->name) : NULL;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
static void stellar_exdata_met_dtor(void *_elt)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_exdata_schema *elt = (struct stellar_exdata_schema *)_elt;
|
2024-05-17 16:55:46 +08:00
|
|
|
if (elt->name)
|
|
|
|
|
FREE(elt->name);
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
UT_icd stellar_exdata_meta_icd = {sizeof(struct stellar_exdata_schema), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor};
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
if(st==NULL || name==NULL)return -1;
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_exdata_schema_array == NULL)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_new(plug_mgr->stellar_exdata_schema_array, &stellar_exdata_meta_icd);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_exdata_schema_array == NULL)return -1;
|
|
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
|
|
|
|
|
struct stellar_exdata_schema *t_schema;
|
2024-05-17 16:55:46 +08:00
|
|
|
for(unsigned int i = 0; i < len; i++)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
t_schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(strcmp(t_schema->name, name) == 0)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
t_schema->free_func=free_func;
|
|
|
|
|
t_schema->free_arg=free_arg;
|
2024-05-17 16:55:46 +08:00
|
|
|
return t_schema->idx;
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_exdata_schema new_schema;
|
|
|
|
|
memset(&new_schema, 0, sizeof(struct stellar_exdata_schema));
|
2024-05-17 16:55:46 +08:00
|
|
|
new_schema.free_func=free_func;
|
|
|
|
|
new_schema.name=(char *)name;
|
|
|
|
|
new_schema.idx=len;
|
|
|
|
|
new_schema.free_arg=free_arg;
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_push_back(plug_mgr->stellar_exdata_schema_array, &new_schema);
|
2024-05-17 16:55:46 +08:00
|
|
|
return new_schema.idx;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_exdata_set(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx, void *ex_ptr)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
if(exdata_schema == NULL|| exdata_array == NULL)return -1;
|
|
|
|
|
unsigned int len=utarray_len(exdata_schema);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(len < (unsigned int)idx)return -1;
|
2024-08-06 20:37:59 +08:00
|
|
|
if((exdata_array+idx)->state == EXIT)return -1;
|
|
|
|
|
(exdata_array+idx)->exdata=ex_ptr;
|
2024-05-17 16:55:46 +08:00
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
if(exdata_schema == NULL|| exdata_array == NULL)return NULL;
|
|
|
|
|
unsigned int len = utarray_len(exdata_schema);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(len < (unsigned int)idx)return NULL;
|
2024-08-06 20:37:59 +08:00
|
|
|
if((exdata_array+idx)->state == EXIT)return NULL;
|
|
|
|
|
return (exdata_array+idx)->exdata;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
2024-08-06 20:37:59 +08:00
|
|
|
* PACKET EXDATA *
|
|
|
|
|
*******************************/
|
|
|
|
|
static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
|
|
|
|
|
{
|
|
|
|
|
if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return NULL;
|
|
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL)
|
|
|
|
|
{
|
|
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
|
|
|
|
|
plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len);
|
|
|
|
|
}
|
|
|
|
|
return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr)
|
|
|
|
|
{
|
|
|
|
|
if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return;
|
|
|
|
|
unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array);
|
|
|
|
|
struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
|
|
|
|
|
if(per_thread_pkt_exdata_arrary == NULL)return;
|
|
|
|
|
for (unsigned int i = 0; i < len; i++)
|
|
|
|
|
{
|
|
|
|
|
void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
|
|
|
|
|
(per_thread_pkt_exdata_arrary + i)->state=EXIT;
|
|
|
|
|
struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
|
|
|
|
|
if (exdata)
|
|
|
|
|
{
|
|
|
|
|
if (schema->free_func)
|
|
|
|
|
{
|
|
|
|
|
schema->free_func(i, exdata, schema->free_arg);
|
|
|
|
|
}
|
|
|
|
|
(per_thread_pkt_exdata_arrary + i)->exdata=NULL;
|
|
|
|
|
}
|
|
|
|
|
(per_thread_pkt_exdata_arrary + i)->state=INIT;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
|
|
|
|
|
{
|
|
|
|
|
if(pkt == NULL)return -1;
|
|
|
|
|
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
|
|
|
|
|
return stellar_exdata_set(plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *packet_exdata_get(struct packet *pkt, int idx)
|
|
|
|
|
{
|
|
|
|
|
if(pkt == NULL)return NULL;
|
|
|
|
|
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
|
|
|
|
|
return stellar_exdata_get( plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
|
|
|
|
* SESSION EXDATA *
|
2024-05-17 16:55:46 +08:00
|
|
|
*******************************/
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
if(plug_mgr_rt == NULL)return -1;
|
|
|
|
|
if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array == NULL)return -1;
|
|
|
|
|
return stellar_exdata_set(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx, ex_ptr);
|
|
|
|
|
}
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void *session_exdata_get(struct session *sess, int idx)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
if(plug_mgr_rt == NULL)return NULL;
|
|
|
|
|
if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
|
|
|
|
|
return stellar_exdata_get(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx);
|
|
|
|
|
}
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
/*******************************
|
|
|
|
|
* STELLAR MQ *
|
|
|
|
|
*******************************/
|
|
|
|
|
static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
|
|
|
|
|
*src = (struct stellar_mq_topic_schema *)_src;
|
2024-05-17 16:55:46 +08:00
|
|
|
dst->subscribers = src->subscribers;
|
|
|
|
|
dst->free_cb = src->free_cb;
|
|
|
|
|
dst->free_cb_arg = src->free_cb_arg;
|
|
|
|
|
dst->topic_id = src->topic_id;
|
|
|
|
|
dst->subscriber_cnt = src->subscriber_cnt;
|
|
|
|
|
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
static void stellar_mq_topic_schema_dtor(void *_elt)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
|
2024-05-17 16:55:46 +08:00
|
|
|
if (elt->topic_name)
|
|
|
|
|
FREE(elt->topic_name);
|
|
|
|
|
// FREE(elt); // free the item
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
|
|
|
|
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
if(topic_name == NULL || mq_schema_array == NULL )return -1;
|
|
|
|
|
unsigned int len = utarray_len(mq_schema_array);
|
|
|
|
|
struct stellar_mq_topic_schema *t_schema;
|
2024-05-17 16:55:46 +08:00
|
|
|
for(unsigned int i = 0; i < len; i++)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(strcmp(t_schema->topic_name, topic_name) == 0)
|
|
|
|
|
{
|
|
|
|
|
return i;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-08-06 20:37:59 +08:00
|
|
|
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
|
|
|
|
|
if(mq_schema_array == NULL)return -1;
|
|
|
|
|
unsigned int len = utarray_len(mq_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(len < (unsigned int)topic_id)return -1;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(t_schema == NULL)return -1;
|
|
|
|
|
t_schema->free_cb=msg_free_cb;
|
|
|
|
|
t_schema->free_cb_arg=msg_free_arg;
|
|
|
|
|
return 0;
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_mq_schema_array == NULL)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
|
|
|
|
|
if(stellar_mq_get_topic_id(st, topic_name) >= 0)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema t_schema;
|
|
|
|
|
memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
|
2024-05-17 16:55:46 +08:00
|
|
|
t_schema.free_cb=msg_free_cb;
|
|
|
|
|
t_schema.topic_name=(char *)topic_name;
|
|
|
|
|
t_schema.topic_id=len;//topid_id equals arrary index
|
|
|
|
|
t_schema.free_cb_arg=msg_free_arg;
|
|
|
|
|
t_schema.subscribers=NULL;
|
|
|
|
|
t_schema.subscriber_cnt=0;
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema);
|
|
|
|
|
plug_mgr->stellar_mq_topic_num+=1;
|
2024-05-17 16:55:46 +08:00
|
|
|
return t_schema.topic_id;
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_mq_schema_array==NULL)return -1;
|
|
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
if (len <= (unsigned int)topic_id)
|
|
|
|
|
return -1;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema *topic =
|
|
|
|
|
(struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
|
|
|
|
|
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
|
|
|
|
|
|
|
|
|
|
if(topic == NULL)return -1;
|
|
|
|
|
|
|
|
|
|
if (topic->is_destroyed == 1)return 0;
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
|
|
|
|
|
{
|
|
|
|
|
DL_DELETE(topic->subscribers, sub_elt);
|
|
|
|
|
FREE(sub_elt);
|
|
|
|
|
}
|
|
|
|
|
topic->is_destroyed = 1;
|
|
|
|
|
plug_mgr->stellar_mq_topic_num-=1;
|
|
|
|
|
return 1; // success
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority)
|
|
|
|
|
{
|
|
|
|
|
if(stellar_mq_schema==NULL || topic_id < 0)return -1;
|
|
|
|
|
unsigned int len = utarray_len(stellar_mq_schema);
|
|
|
|
|
if (len <= (unsigned int)topic_id)return -1;
|
|
|
|
|
struct stellar_message *msg= CALLOC(struct stellar_message,1);
|
|
|
|
|
msg->header.topic_id = topic_id;
|
|
|
|
|
msg->header.type=type;
|
|
|
|
|
msg->header.priority = priority;
|
|
|
|
|
msg->body = data;
|
|
|
|
|
DL_APPEND(priority_mq[priority], msg);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
|
|
|
|
|
|
|
|
|
|
static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
|
|
|
|
|
{
|
|
|
|
|
if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
|
|
|
|
|
|
|
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
|
|
|
|
|
if (len <= (unsigned int)topic_id)return -1;
|
|
|
|
|
|
|
|
|
|
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
|
|
|
|
|
if(topic==NULL)return -1;
|
|
|
|
|
|
|
|
|
|
// if plugin already subscribe current topic, return 0
|
|
|
|
|
struct stellar_mq_subscriber_info *p=NULL;
|
|
|
|
|
while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p)))
|
|
|
|
|
{
|
|
|
|
|
if(p->topic_id==topic_id)
|
|
|
|
|
{
|
|
|
|
|
struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
|
|
|
|
|
int cnt=0;
|
|
|
|
|
while(tmp_subscriber)
|
|
|
|
|
{
|
|
|
|
|
if(cnt==p->subscriber_idx)
|
|
|
|
|
{
|
|
|
|
|
tmp_subscriber->msg_cb=plugin_on_msg_cb;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
cnt++;
|
|
|
|
|
tmp_subscriber=tmp_subscriber->next;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
|
|
|
|
|
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
|
|
|
|
|
new_subscriber->plugin_idx = plugin_idx;
|
|
|
|
|
new_subscriber->msg_cb = plugin_on_msg_cb;
|
|
|
|
|
DL_APPEND(topic->subscribers, new_subscriber);
|
|
|
|
|
|
|
|
|
|
struct stellar_mq_subscriber_info sub_info;
|
|
|
|
|
memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
|
|
|
|
|
sub_info.topic_id=topic_id;
|
|
|
|
|
sub_info.subscriber_idx=topic->subscriber_cnt;
|
|
|
|
|
utarray_push_back(registed_mq_subscriber_info, &sub_info);
|
|
|
|
|
topic->subscriber_cnt+=1;
|
|
|
|
|
plug_mgr->session_topic_subscriber_num+=1;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
|
|
|
|
|
struct registered_session_plugin_schema *session_plugin_schema;
|
|
|
|
|
struct session_plugin_ctx_runtime *plugin_ctx_rt;
|
|
|
|
|
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array,
|
|
|
|
|
(unsigned int)(mq_elt->header.topic_id));
|
2024-05-17 16:55:46 +08:00
|
|
|
|
|
|
|
|
if (topic)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
int cur_sub_idx = 0;
|
2024-05-17 16:55:46 +08:00
|
|
|
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
|
|
|
|
|
if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
|
|
|
|
|
{
|
|
|
|
|
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
|
|
|
|
|
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
|
|
|
|
|
plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
|
|
|
|
|
if (session_plugin_schema)
|
|
|
|
|
{
|
|
|
|
|
if (plugin_ctx_rt->state == INIT)
|
|
|
|
|
{
|
|
|
|
|
if (session_plugin_schema->on_ctx_new)
|
|
|
|
|
{
|
|
|
|
|
plugin_ctx_rt->plugin_ctx =
|
|
|
|
|
session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
|
|
|
|
|
if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
|
|
|
|
|
{
|
|
|
|
|
session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
|
|
|
|
|
session_plugin_schema->plugin_env);
|
|
|
|
|
plugin_ctx_rt->plugin_ctx = NULL;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
plugin_ctx_rt->state = ACTIVE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (sub_elt->sess_msg_cb &&
|
|
|
|
|
bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
|
|
|
|
|
0) // ctx_new maybe call detach, need check again
|
|
|
|
|
{
|
|
|
|
|
sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
|
|
|
|
|
session_plugin_schema->plugin_env);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
cur_sub_idx++;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if (cur_sub_idx == 0)
|
|
|
|
|
bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
|
|
|
|
|
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
|
|
|
|
|
struct registered_packet_plugin_schema *packet_plugin_schema;
|
|
|
|
|
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
|
|
|
|
|
(unsigned int)(mq_elt->header.topic_id));
|
|
|
|
|
if (topic)
|
|
|
|
|
{
|
|
|
|
|
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
|
|
|
|
|
{
|
|
|
|
|
if (sub_elt->pkt_msg_cb)
|
|
|
|
|
{
|
|
|
|
|
packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
|
|
|
|
|
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
|
|
|
|
|
if (packet_plugin_schema)
|
|
|
|
|
{
|
|
|
|
|
sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt)
|
|
|
|
|
{
|
|
|
|
|
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
|
|
|
|
|
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
|
|
|
|
|
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
|
|
|
|
|
{
|
|
|
|
|
if(priority_mq[cur_priority]==NULL)
|
|
|
|
|
{
|
|
|
|
|
cur_priority--;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
|
|
|
|
|
{
|
|
|
|
|
if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt);
|
|
|
|
|
if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt);
|
|
|
|
|
DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
|
|
|
|
|
DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
|
|
|
|
|
|
|
|
|
|
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array)
|
|
|
|
|
{
|
|
|
|
|
struct stellar_message *mq_elt, *tmp;
|
|
|
|
|
struct stellar_mq_topic_schema *topic;
|
|
|
|
|
DL_FOREACH_SAFE(*head, mq_elt, tmp)
|
|
|
|
|
{
|
|
|
|
|
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
|
|
|
|
|
(unsigned int)(mq_elt->header.topic_id));
|
|
|
|
|
if (topic && topic->free_cb)
|
|
|
|
|
{
|
|
|
|
|
topic->free_cb(mq_elt->body, topic->free_cb_arg);
|
|
|
|
|
}
|
|
|
|
|
DL_DELETE(*head, mq_elt);
|
|
|
|
|
FREE(mq_elt);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
|
|
|
|
* PACKET MQ *
|
|
|
|
|
*******************************/
|
|
|
|
|
|
|
|
|
|
//return 0 if success, otherwise return -1.
|
|
|
|
|
int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id)
|
|
|
|
|
{
|
|
|
|
|
if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
|
|
|
|
|
int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
|
|
|
|
|
|
|
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
|
|
|
|
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
|
|
|
|
|
|
|
|
|
|
struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
|
|
|
|
|
if(packet_plugin_schema==NULL)return -1;
|
|
|
|
|
|
|
|
|
|
if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
|
|
|
|
|
{
|
|
|
|
|
utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *data, enum stellar_mq_priority priority)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
|
|
|
|
|
int tid = stellar_get_current_thread_index();
|
|
|
|
|
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1;
|
|
|
|
|
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
|
|
|
|
|
if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
|
|
|
|
|
{
|
|
|
|
|
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
|
|
|
|
|
{
|
|
|
|
|
return packet_mq_publish_message_with_priority(pkt, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
|
|
|
|
* SESSION MQ *
|
|
|
|
|
*******************************/
|
|
|
|
|
|
|
|
|
|
int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority)
|
|
|
|
|
{
|
|
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
assert(plug_mgr_rt);
|
|
|
|
|
if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
|
|
|
|
|
if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1;
|
|
|
|
|
if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
|
|
|
|
|
int tid = stellar_get_current_thread_index();
|
|
|
|
|
if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
|
|
|
|
|
{
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt+=1;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
|
2024-05-17 16:55:46 +08:00
|
|
|
int session_mq_publish_message(struct session *sess, int topic_id, void *data)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
|
|
|
|
|
{
|
|
|
|
|
//update topic status
|
|
|
|
|
switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt))
|
|
|
|
|
{
|
|
|
|
|
case 1:
|
|
|
|
|
bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0);
|
|
|
|
|
break;
|
|
|
|
|
case 0:
|
|
|
|
|
bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
return;
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
|
2024-05-17 16:55:46 +08:00
|
|
|
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-05-17 16:55:46 +08:00
|
|
|
if(bit_value!=0 && bit_value!=1)return -1;
|
|
|
|
|
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
|
|
|
|
|
if(topic_id < 0 || plugin_id < 0)return -1;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr_rt==NULL)return -1;
|
2024-08-06 20:37:59 +08:00
|
|
|
if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
|
|
|
|
|
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(topic==NULL)return -1;
|
2024-04-11 16:30:21 +08:00
|
|
|
|
2024-05-17 16:55:46 +08:00
|
|
|
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
|
|
|
|
|
if(session_plugin_schema==NULL)return -1;
|
|
|
|
|
|
|
|
|
|
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
|
|
|
|
|
if(plug_mgr_rt->session_mq_status)
|
|
|
|
|
{
|
|
|
|
|
for(unsigned int i=0; i < plugin_subscriber_num; i++)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(topic_id==session_plugin_sub_info->topic_id)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
session_mq_update_topic_status(plug_mgr_rt, topic);
|
2024-05-17 16:55:46 +08:00
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
|
|
|
|
|
{
|
|
|
|
|
return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
|
|
|
|
|
{
|
|
|
|
|
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
|
|
|
|
|
if(session_plugin_schema==NULL)return -1;
|
2024-05-17 16:55:46 +08:00
|
|
|
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
|
|
|
|
|
}
|
|
|
|
|
//session plugin id equals to plugin idx
|
|
|
|
|
return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int session_mq_topic_is_active(struct session *sess, int topic_id)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
assert(plug_mgr_rt);
|
|
|
|
|
if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
|
|
|
|
|
if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
|
|
|
|
|
if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
|
|
|
|
|
return 1;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*******************************
|
|
|
|
|
* PLUGIN MANAGER SESSION RUNTIME *
|
|
|
|
|
*******************************/
|
2024-08-06 20:37:59 +08:00
|
|
|
static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_exdata *exdata_rt = NULL;
|
|
|
|
|
if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
|
|
|
|
|
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(len > 0)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
exdata_rt=CALLOC(struct stellar_exdata, len);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
return exdata_rt;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
|
|
|
|
if(exdata_rt==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
if(plug_mgr->stellar_exdata_schema_array==NULL)return;
|
|
|
|
|
unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
for (unsigned int i = 0; i < len; i++)
|
|
|
|
|
{
|
|
|
|
|
void *exdata = (exdata_rt + i)->exdata;
|
2024-08-06 20:37:59 +08:00
|
|
|
(exdata_rt + i)->state=EXIT;
|
|
|
|
|
struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
|
2024-05-17 16:55:46 +08:00
|
|
|
if (exdata)
|
|
|
|
|
{
|
|
|
|
|
if (schema->free_func)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
schema->free_func(i, exdata, schema->free_arg);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
|
2024-05-17 16:55:46 +08:00
|
|
|
rt->plug_mgr = plug_mgr;
|
|
|
|
|
rt->sess = sess;
|
2024-08-06 20:37:59 +08:00
|
|
|
rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1);
|
|
|
|
|
rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1);
|
|
|
|
|
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
|
|
|
|
|
if(plug_mgr->registered_session_plugin_array)
|
|
|
|
|
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
|
2024-05-17 16:55:46 +08:00
|
|
|
return rt;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
|
|
|
|
|
{
|
|
|
|
|
if(rt==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
|
2024-05-17 16:55:46 +08:00
|
|
|
if(rt->session_mq_status != NULL)
|
|
|
|
|
{
|
|
|
|
|
bitmap_free(rt->session_mq_status);
|
2024-08-06 20:37:59 +08:00
|
|
|
rt->session_mq_status=NULL;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if(rt->session_topic_status != NULL)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
bitmap_free(rt->session_topic_status);
|
|
|
|
|
rt->session_topic_status=NULL;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
if (rt->plug_mgr->registered_session_plugin_array)
|
|
|
|
|
{
|
|
|
|
|
unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
|
|
|
|
|
for (unsigned int i = 0; i < len; i++)
|
|
|
|
|
{
|
|
|
|
|
struct session_plugin_ctx_runtime *plugin_ctx_rt = (rt->plugin_ctx_array + i);
|
|
|
|
|
struct registered_session_plugin_schema *session_plugin_schema =
|
|
|
|
|
(struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
|
|
|
|
|
if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE)
|
|
|
|
|
{
|
|
|
|
|
session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx,
|
|
|
|
|
session_plugin_schema->plugin_env);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
FREE(rt->plugin_ctx_array);
|
|
|
|
|
}
|
|
|
|
|
session_exdata_runtime_free(rt->plug_mgr, rt->sess_exdata_array);
|
|
|
|
|
FREE(rt->sess_exdata_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
FREE(rt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*********************************************
|
|
|
|
|
* PLUGIN MANAGER PACKET PLUGIN *
|
|
|
|
|
*********************************************/
|
|
|
|
|
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr->registered_packet_plugin_array == NULL)
|
|
|
|
|
{
|
|
|
|
|
utarray_new(plug_mgr->registered_packet_plugin_array, ®istered_packet_plugin_array_icd);
|
|
|
|
|
}
|
|
|
|
|
struct registered_packet_plugin_schema packet_plugin_schema;
|
|
|
|
|
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
|
2024-08-06 20:37:59 +08:00
|
|
|
packet_plugin_schema.ip_protocol = ip_proto;
|
|
|
|
|
packet_plugin_schema.on_packet = on_packet_cb;
|
2024-05-17 16:55:46 +08:00
|
|
|
packet_plugin_schema.plugin_env = plugin_env;
|
|
|
|
|
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
|
2024-08-06 20:37:59 +08:00
|
|
|
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
2024-05-17 16:55:46 +08:00
|
|
|
struct registered_packet_plugin_schema *p=NULL;
|
2024-08-06 20:37:59 +08:00
|
|
|
|
|
|
|
|
//TODO: get innermost layer ip protocol by packet api
|
|
|
|
|
struct tuple6 t6;
|
|
|
|
|
packet_get_innermost_tuple6(pkt, &t6);
|
|
|
|
|
unsigned char ip_proto=t6.ip_proto;
|
|
|
|
|
|
|
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
//TODO : provide public api to reset pub_msg_cnt
|
|
|
|
|
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
|
2024-05-17 16:55:46 +08:00
|
|
|
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
|
|
|
|
|
{
|
|
|
|
|
if(p->ip_protocol == ip_proto && p->on_packet)
|
|
|
|
|
{
|
|
|
|
|
p->on_packet(pkt, ip_proto, p->plugin_env);
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
|
2024-05-17 16:55:46 +08:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
|
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
|
|
|
|
|
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
|
|
|
|
|
stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
|
|
|
|
|
plug_mgr->stellar_mq_schema_array);
|
|
|
|
|
per_thread_packet_exdata_arrary_clean(plug_mgr);
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-17 16:55:46 +08:00
|
|
|
/*********************************************
|
|
|
|
|
* PLUGIN MANAGER POLLING PLUGIN *
|
|
|
|
|
*********************************************/
|
|
|
|
|
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
|
|
|
|
|
|
|
|
|
|
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
|
|
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr->registered_polling_plugin_array == NULL)
|
|
|
|
|
{
|
|
|
|
|
utarray_new(plug_mgr->registered_polling_plugin_array, ®istered_polling_plugin_array_icd);
|
|
|
|
|
}
|
|
|
|
|
struct registered_polling_plugin_schema polling_plugin_schema;
|
|
|
|
|
memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema));
|
|
|
|
|
polling_plugin_schema.on_polling = on_polling;
|
|
|
|
|
polling_plugin_schema.plugin_env = plugin_env;
|
|
|
|
|
utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
|
2024-08-06 20:37:59 +08:00
|
|
|
return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
|
|
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(plug_mgr==NULL || plug_mgr->registered_polling_plugin_array == NULL)return 0;
|
2024-05-17 16:55:46 +08:00
|
|
|
struct registered_polling_plugin_schema *p=NULL;
|
|
|
|
|
int polling_state=0;
|
|
|
|
|
while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
|
|
|
|
|
{
|
|
|
|
|
if(p->on_polling)
|
|
|
|
|
{
|
|
|
|
|
if(p->on_polling(p->plugin_env)==1)
|
|
|
|
|
{
|
|
|
|
|
polling_state=1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return polling_state;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*********************************************
|
|
|
|
|
* PLUGIN MANAGER SESSION PLUGIN *
|
|
|
|
|
*********************************************/
|
|
|
|
|
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
|
|
|
|
|
|
|
|
|
|
int stellar_session_plugin_register(struct stellar *st,
|
|
|
|
|
session_ctx_new_func session_ctx_new,
|
|
|
|
|
session_ctx_free_func session_ctx_free,
|
|
|
|
|
void *plugin_env)
|
|
|
|
|
{
|
2024-05-28 10:26:29 +08:00
|
|
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr->registered_session_plugin_array == NULL)
|
|
|
|
|
{
|
|
|
|
|
utarray_new(plug_mgr->registered_session_plugin_array, ®istered_session_plugin_schema_icd);
|
|
|
|
|
}
|
|
|
|
|
struct registered_session_plugin_schema session_plugin_schema;
|
|
|
|
|
memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
|
|
|
|
|
session_plugin_schema.on_ctx_new = session_ctx_new;
|
|
|
|
|
session_plugin_schema.on_ctx_free = session_ctx_free;
|
|
|
|
|
session_plugin_schema.plugin_env = plugin_env;
|
|
|
|
|
utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
|
|
|
|
|
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(sess==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr_rt==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
#if 0
|
|
|
|
|
int topic_id = -1;
|
|
|
|
|
//FIXME: get topic and tcp data by stellar api
|
2024-05-17 16:55:46 +08:00
|
|
|
switch (packet_get_type(pkt))
|
|
|
|
|
{
|
|
|
|
|
case TCP:
|
|
|
|
|
topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id;
|
|
|
|
|
break;
|
|
|
|
|
case TCP_STREAM:
|
|
|
|
|
topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id;
|
|
|
|
|
break;
|
|
|
|
|
case UDP:
|
|
|
|
|
topic_id=plug_mgr_rt->plug_mgr->udp_topic_id;
|
|
|
|
|
break;
|
|
|
|
|
case CONTROL:
|
|
|
|
|
topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
2024-08-06 20:37:59 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt=0;
|
|
|
|
|
session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
|
2024-05-17 16:55:46 +08:00
|
|
|
#endif
|
2024-08-06 20:37:59 +08:00
|
|
|
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt=0;
|
|
|
|
|
plugin_manager_scratch_session_set(sess);
|
2024-05-17 16:55:46 +08:00
|
|
|
struct tcp_segment *seg;
|
2024-04-11 19:44:02 +08:00
|
|
|
enum session_type type = session_get_type(sess);
|
2024-04-11 16:30:21 +08:00
|
|
|
|
2024-04-11 19:44:02 +08:00
|
|
|
if (packet_is_ctrl(pkt))
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2024-04-11 19:44:02 +08:00
|
|
|
switch (type)
|
|
|
|
|
{
|
|
|
|
|
case SESSION_TYPE_TCP:
|
2024-08-06 20:37:59 +08:00
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
|
2024-08-13 15:26:32 +08:00
|
|
|
if ((seg = session_get_tcp_segment(sess)) != NULL)
|
2024-08-13 09:45:35 +08:00
|
|
|
{
|
|
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id, (void *)seg, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
}
|
|
|
|
|
break;
|
2024-04-11 19:44:02 +08:00
|
|
|
case SESSION_TYPE_UDP:
|
2024-08-06 20:37:59 +08:00
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
|
2024-04-11 19:44:02 +08:00
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
assert(0);
|
|
|
|
|
break;
|
|
|
|
|
}
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
2024-05-17 16:55:46 +08:00
|
|
|
//TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
|
2024-08-06 20:37:59 +08:00
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
|
|
|
|
|
|
2024-08-13 15:26:32 +08:00
|
|
|
while ((seg = session_get_tcp_segment(sess)) != NULL)
|
|
|
|
|
{
|
|
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id, (void *)seg, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
|
|
|
|
|
}
|
|
|
|
|
plugin_manager_scratch_session_set(NULL);
|
2024-08-06 20:37:59 +08:00
|
|
|
|
2024-08-13 15:26:32 +08:00
|
|
|
return;
|
2024-04-11 16:30:21 +08:00
|
|
|
}
|
|
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
|
2024-04-11 16:30:21 +08:00
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(sess==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr_rt==NULL)return;
|
|
|
|
|
plugin_manager_scratch_session_set(sess);
|
2024-08-06 20:37:59 +08:00
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
|
|
|
|
|
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
plugin_manager_scratch_session_set(NULL);
|
2024-08-06 20:37:59 +08:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void plugin_manager_on_session_closing(struct session *sess)
|
|
|
|
|
{
|
2024-08-08 15:35:29 +08:00
|
|
|
if(sess==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
|
|
|
|
if(plug_mgr_rt==NULL)return;
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
|
|
|
|
|
switch (session_get_type(sess))
|
|
|
|
|
{
|
|
|
|
|
case SESSION_TYPE_TCP:
|
|
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
break;
|
|
|
|
|
case SESSION_TYPE_UDP:
|
|
|
|
|
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
int tid=stellar_get_current_thread_index();
|
|
|
|
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
|
|
|
|
|
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
|
|
|
|
|
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
|
2024-05-17 16:55:46 +08:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void stellar_session_plugin_dettach_current_session(struct session *sess)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
2024-05-17 16:55:46 +08:00
|
|
|
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
|
|
|
|
|
if(session_plugin_schema==NULL)return;
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_topic_schema *topic=NULL;
|
2024-05-17 16:55:46 +08:00
|
|
|
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
|
2024-08-06 20:37:59 +08:00
|
|
|
//Won't Do: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
|
|
|
|
|
//allow plugin register with null ctx_new and ctx_free
|
2024-05-17 16:55:46 +08:00
|
|
|
if(plug_mgr_rt->session_mq_status)
|
|
|
|
|
{
|
|
|
|
|
for(unsigned int i=0; i < plugin_subscriber_num; i++)
|
|
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
|
|
|
|
|
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
|
|
|
|
|
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
|
|
|
|
|
session_mq_update_topic_status(plug_mgr_rt, topic);
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
//dettach in ctx INIT, do not call on_ctx_free immidiately
|
|
|
|
|
if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free))
|
2024-05-17 16:55:46 +08:00
|
|
|
{
|
2024-08-06 20:37:59 +08:00
|
|
|
session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env);
|
|
|
|
|
plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL;
|
2024-05-17 16:55:46 +08:00
|
|
|
}
|
2024-08-06 20:37:59 +08:00
|
|
|
plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT;
|
2024-05-17 16:55:46 +08:00
|
|
|
|
2024-08-06 20:37:59 +08:00
|
|
|
return;
|
|
|
|
|
}
|