This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/plugin_manager/plugin_manager.c

361 lines
13 KiB
C
Raw Normal View History

#include "plugin_manager_interna.h"
#include "stellar/utils.h"
#include "toml/toml.h"
#include <dlfcn.h>
#include <stdbool.h>
#include "stellar_core.h"
#if 0
void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment;
}
void stellar_per_stage_message_counter_set(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=increment;
}
bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *plug_mgr, int tid)
{
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true;
return false;
}
#endif
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
*spec_num = 0;
FILE* fp = fopen(toml_conf_path, "r");
if(fp==NULL)return NULL;
char errbuf[256];
toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
fclose(fp);
if (!conf) {
fprintf(stderr, "Error parsing toml: %s\n", errbuf);
return NULL;
}
struct plugin_specific* plugins=NULL;
toml_array_t* plugin_array = toml_array_in(conf, "plugin");
if(plugin_array==NULL)goto PLUGIN_SPEC_LOAD_ERROR;
*spec_num = toml_array_nelem(plugin_array);
plugins = CALLOC(struct plugin_specific, *spec_num);
for (int i = 0; i < *spec_num; i++) {
toml_table_t* plugin = toml_table_at(plugin_array, i);
const char *path_raw = toml_raw_in(plugin, "path");
const char *init_func_name_raw = toml_raw_in(plugin, "init");
const char *exit_func_name_raw = toml_raw_in(plugin, "exit");
char *path = NULL;
char *init_func_name = NULL;
char *exit_func_name = NULL;
if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
toml_rtos(exit_func_name_raw, &exit_func_name))
{
goto PLUGIN_SPEC_LOAD_ERROR;
}
void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
if (!handle) {
fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
goto PLUGIN_SPEC_LOAD_ERROR;
}
plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name);
if (!plugins[i].load_cb) {
fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
}
plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name);
if (!plugins[i].unload_cb) {
fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
}
FREE(path);
FREE(init_func_name);
FREE(exit_func_name);
}
toml_free(conf);
return plugins;
PLUGIN_SPEC_LOAD_ERROR:
toml_free(conf);
if(plugins)FREE(plugins);
return NULL;
}
#if 0
static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
{
if(st == NULL)return NULL;
int thread_num=stellar_get_worker_thread_num(st);
struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num);
return per_thread_data;
}
static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st)
{
if(per_thread_data == NULL || st == NULL)return;
int thread_num=stellar_get_worker_thread_num(st);
struct plugin_manager_per_thread_data *p_data;
for (int i = 0; i < thread_num; i++)
{
p_data=per_thread_data+i;
exdata_handle_free(p_data->exdata_array);
}
FREE(per_thread_data);
return;
}
#endif
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
if(spec_num < 0)
{
return NULL;
}
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
//plug_mgr->max_message_dispatch=max_msg_per_stage;
if(spec_num > 0)
{
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num);
}
plug_mgr->st = st;
stellar_set_plugin_manger(st, plug_mgr);
//plug_mgr->exdata_schema=exdata_schema_new();
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
{
specs[i].plugin_ctx=specs[i].load_cb(st);
utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]);
}
}
FREE(specs);
//plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
return plug_mgr;
}
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL)return;
struct plugin_specific *p=NULL;
if (plug_mgr->plugin_load_specs_array)
{
while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p)))
{
if (p->unload_cb)
p->unload_cb(p->plugin_ctx);
}
utarray_free(plug_mgr->plugin_load_specs_array);
}
#if 0
if(plug_mgr->stellar_mq_schema_array)
{
for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
{
stellar_mq_destroy_topic( plug_mgr->st, i);
}
utarray_free(plug_mgr->stellar_mq_schema_array);
}
//if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
2024-09-06 11:56:34 +08:00
if(plug_mgr->registered_packet_plugin_array)
{
struct registered_plugin_schema *s = NULL;
2024-09-06 11:56:34 +08:00
while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
{
if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
}
2024-09-06 11:56:34 +08:00
utarray_free(plug_mgr->registered_packet_plugin_array);
}
#endif
//plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
//exdata_schema_free(plug_mgr->exdata_schema);
FREE(plug_mgr);
return;
}
/*******************************
* STELLAR EXDATA *
*******************************/
#if 0
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
{
if(st==NULL || name==NULL)return -1;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->exdata_schema==NULL)return -1;
return exdata_new_index(plug_mgr->exdata_schema, name, free_func, free_arg);
}
/*******************************
* PACKET EXDATA *
*******************************/
static struct exdata_runtime *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return NULL;
int tid=stellar_get_current_thread_index();
if(plug_mgr->per_thread_data[tid].exdata_array == NULL)
{
plug_mgr->per_thread_data[tid].exdata_array = exdata_handle_new(plug_mgr->exdata_schema);
}
return plug_mgr->per_thread_data[tid].exdata_array;
}
static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return;
struct exdata_runtime *per_thread_exdata_handle = per_thread_packet_exdata_arrary_get(plug_mgr);
return exdata_handle_reset(per_thread_exdata_handle);
}
int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
{
if(pkt == NULL)return -1;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
return exdata_set(per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
}
void *packet_exdata_get(struct packet *pkt, int idx)
{
if(pkt == NULL)return NULL;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
return exdata_get( per_thread_packet_exdata_arrary_get(plug_mgr), idx);
}
/*******************************
* SESSION EXDATA *
*******************************/
int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
{
struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess);
if(sess_exdata == NULL)return -1;
return exdata_set(sess_exdata,idx, ex_ptr);
}
void *session_exdata_get(struct session *sess, int idx)
{
struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess);
if(sess_exdata == NULL)return NULL;
return exdata_get(sess_exdata, idx);
}
#endif
#if 0
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
struct exdata_runtime *session_exdata_runtime_new(struct stellar *st)
{
2024-09-06 11:56:34 +08:00
struct plugin_manager_schema *plug_mgr=stellar_get_plugin_manager(st);
return exdata_handle_new(plug_mgr->exdata_schema);
}
void session_exdata_runtime_free(struct exdata_runtime *exdata_h)
{
return exdata_handle_free(exdata_h);
}
/*********************************************
* PLUGIN MANAGER PLUGIN *
*********************************************/
UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL};
int stellar_plugin_register(struct stellar *st, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
2024-09-06 11:56:34 +08:00
if(plug_mgr->registered_packet_plugin_array == NULL)
{
2024-09-06 11:56:34 +08:00
utarray_new(plug_mgr->registered_packet_plugin_array, &registered_plugin_array_icd);
}
struct registered_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
packet_plugin_schema.plugin_env = plugin_env;
2024-09-06 11:56:34 +08:00
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
}
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
{
2024-09-06 11:56:34 +08:00
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_plugin_schema *p=NULL;
//int tid=stellar_get_current_thread_index();
//stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
2024-09-06 11:56:34 +08:00
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->on_packet[in_out])
{
p->on_packet[in_out](pkt, p->plugin_env);
}
}
//stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
return;
}
void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_INPUT);
}
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
2024-09-06 11:56:34 +08:00
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
//int tid=stellar_get_current_thread_index();
//stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
//stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
// plug_mgr->stellar_mq_schema_array);
//per_thread_packet_exdata_arrary_clean(plug_mgr);
}
/*********************************************
* PLUGIN MANAGER POLLING PLUGIN *
*********************************************/
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_polling_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_polling_plugin_array, &registered_polling_plugin_array_icd);
}
struct registered_polling_plugin_schema polling_plugin_schema;
memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema));
polling_plugin_schema.on_polling = on_polling;
polling_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
return (utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
}
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL || plug_mgr->registered_polling_plugin_array == NULL)return 0;
struct registered_polling_plugin_schema *p=NULL;
int polling_state=0;
while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
{
if(p->on_polling)
{
if(p->on_polling(p->plugin_env)==1)
{
polling_state=1;
}
}
}
return polling_state;
}
#endif