#include "module_manager_interna.h" #include "stellar/utils.h" #include #include #include #include #include "toml/toml.h" #include "uthash/utlist.h" /******************************************* * module manager API * *******************************************/ struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_t n_mod, int max_thread_num, const char *toml_path, struct logger *logger) { struct module_manager *mod_mgr = CALLOC(struct module_manager, 1); mod_mgr->config.max_thread_num=max_thread_num; mod_mgr->config.logger=logger; if(toml_path)mod_mgr->config.toml_path=strdup(toml_path); if(mod_hooks==NULL || n_mod==0)return mod_mgr; mod_mgr->descriptors = CALLOC(struct module_descriptor, n_mod); mod_mgr->n_descriptor = n_mod; for (size_t i = 0; i < n_mod; i++) { mod_mgr->descriptors[i].hooks = mod_hooks[i]; if (mod_hooks[i].on_instance_init_cb) { mod_mgr->descriptors[i].mod = mod_hooks[i].on_instance_init_cb(mod_mgr); if (module_manager_get_module(mod_mgr, mod_mgr->descriptors[i].mod->name) == NULL) { mod_mgr->descriptors[i].initialized = true; } else { fprintf(stderr, "Module %s already exists\n", mod_mgr->descriptors[i].mod->name); if (mod_hooks[i].on_instance_exit_cb) mod_hooks[i].on_instance_exit_cb(mod_mgr, mod_mgr->descriptors[i].mod); } } } return mod_mgr; } struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct logger *logger) { FILE *fp=fopen(toml_path, "r"); if(!fp)return module_manager_new(NULL, 0, max_thread_num, toml_path,logger); toml_table_t *conf = toml_parse_file(fp, NULL, 0); fclose(fp); if(conf==NULL)return module_manager_new(NULL, 0, max_thread_num, toml_path, logger); toml_array_t* mod_array = toml_array_in(conf, "module"); if(mod_array==NULL) { toml_free(conf); return module_manager_new(NULL, 0, max_thread_num, toml_path, logger); } int mod_num = toml_array_nelem(mod_array); struct module_hooks mod_hooks[mod_num]; memset(mod_hooks, 0, sizeof(mod_hooks)); char *path = NULL; char *instance_init_cb_name = NULL, *instance_exit_cb_name = NULL, *thread_init_cb_name = NULL, *thread_exit_cb_name = NULL; for (int i = 0; i < mod_num; i++) { toml_table_t* toml_mod = toml_table_at(mod_array, i); const char *path_raw = toml_raw_in(toml_mod, "path"); const char *init_func_name_raw = toml_raw_in(toml_mod, "init"); const char *exit_func_name_raw = toml_raw_in(toml_mod, "exit"); const char *thread_init_func_name_raw = toml_raw_in(toml_mod, "thread_init"); const char *thread_exit_func_name_raw = toml_raw_in(toml_mod, "thread_exit"); toml_rtos(path_raw, &path); void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL); if(path)FREE(path); if (!handle) { break; } toml_rtos(init_func_name_raw, &instance_init_cb_name); if (instance_init_cb_name) { mod_hooks[i].on_instance_init_cb = (module_on_instance_init_func *)dlsym(handle, instance_init_cb_name); //TODO: logger error FREE(instance_init_cb_name); } toml_rtos(exit_func_name_raw, &instance_exit_cb_name); if (instance_exit_cb_name) { mod_hooks[i].on_instance_exit_cb = (module_on_instance_exit_func *)dlsym(handle, instance_exit_cb_name); FREE(instance_exit_cb_name); } toml_rtos(thread_init_func_name_raw, &thread_init_cb_name); if (thread_init_cb_name) { mod_hooks[i].on_thread_init_cb = (module_on_thread_init_func *)dlsym(handle, thread_init_cb_name); FREE(thread_init_cb_name); } toml_rtos(thread_exit_func_name_raw, &thread_exit_cb_name); if (thread_exit_cb_name) { mod_hooks[i].on_thread_exit_cb = (module_on_thread_exit_func *)dlsym(handle, thread_exit_cb_name); FREE(thread_exit_cb_name); } } toml_free(conf); return module_manager_new(mod_hooks, mod_num, max_thread_num, toml_path, logger); } void module_manager_free(struct module_manager *mod_mgr) { if(mod_mgr==NULL)return; if(mod_mgr->config.toml_path)FREE(mod_mgr->config.toml_path); if (mod_mgr->descriptors) { for (int i = 0; i < mod_mgr->n_descriptor; i++) { if (mod_mgr->descriptors[i].hooks.on_instance_exit_cb != NULL && mod_mgr->descriptors[i].initialized) { mod_mgr->descriptors[i].hooks.on_instance_exit_cb(mod_mgr, mod_mgr->descriptors[i].mod); } } FREE(mod_mgr->descriptors); } struct polling_node *node, *tmp; LL_FOREACH_SAFE(mod_mgr->node_list, node, tmp) { LL_DELETE(mod_mgr->node_list, node); FREE(node); } FREE(mod_mgr); return; } int module_manager_get_max_thread_num(struct module_manager*mod_mgr) { if(mod_mgr==NULL)return -1; return mod_mgr->config.max_thread_num; } struct logger *module_manager_get_logger(struct module_manager *mod_mgr) { if(mod_mgr==NULL)return NULL; return mod_mgr->config.logger; } const char *module_manager_get_toml_path(struct module_manager *mod_mgr) { if(mod_mgr==NULL)return NULL; return mod_mgr->config.toml_path; } __thread int local_thread_id=-1; __thread struct mq_runtime *local_mq_rt=NULL; int module_manager_get_thread_id(struct module_manager* mod_mgr __unused) { return local_thread_id; } struct mq_runtime *module_manager_get_mq_runtime(struct module_manager *mod_mgr __unused) { return local_mq_rt; } void module_manager_register_thread(struct module_manager* mod_mgr, int thread_id) { local_thread_id=thread_id; for(int i=0; in_descriptor; i++) { if(mod_mgr->descriptors[i].mod == NULL)break; if(mod_mgr->descriptors[i].hooks.on_thread_init_cb && mod_mgr->descriptors[i].initialized) { mod_mgr->descriptors[i].hooks.on_thread_init_cb(mod_mgr, thread_id, mod_mgr->descriptors[i].mod); } } return; } void module_manager_unregister_thread(struct module_manager *mod_mgr, int thread_id) { assert(local_thread_id==thread_id); for(int i=0; in_descriptor; i++) { if(mod_mgr->descriptors[i].mod == NULL)break; if(mod_mgr->descriptors[i].hooks.on_thread_exit_cb && mod_mgr->descriptors[i].initialized) { mod_mgr->descriptors[i].hooks.on_thread_exit_cb(mod_mgr, thread_id, mod_mgr->descriptors[i].mod); } } local_thread_id=-1; local_mq_rt=NULL; return; } struct module *module_manager_get_module(struct module_manager *mod_mgr, const char *module_name) { if(mod_mgr==NULL || module_name == NULL)return NULL; if (mod_mgr->descriptors) { for(int i=0; in_descriptor; i++) { if(mod_mgr->descriptors[i].mod == NULL)continue; if(strcmp(mod_mgr->descriptors[i].mod->name, module_name)==0 && mod_mgr->descriptors[i].initialized) { return mod_mgr->descriptors[i].mod; } } } return NULL; } /******************************************* * stellar module API * *******************************************/ struct module *module_new(const char *name, void *ctx) { struct module *mod = CALLOC(struct module, 1); memcpy(mod->name, name, MIN(NAME_MAX, strlen(name))); mod->ctx=ctx; return mod; } void module_free(struct module *mod) { if(mod==NULL)return; FREE(mod); return; } void * module_get_ctx(struct module *mod) { if(mod==NULL)return NULL; return mod->ctx; } void module_set_ctx(struct module *mod, void *ctx) { if(mod==NULL)return; mod->ctx=ctx; return; } const char *module_get_name(struct module* mod) { if(mod==NULL)return NULL; return mod->name; } void module_set_name(struct module* mod, const char *name) { if(mod==NULL)return; memcpy(mod->name, name, MIN(NAME_MAX, strlen(name))); return; } /******************************************* * polling API * *******************************************/ int module_manager_register_polling_node(struct module_manager *mod_mgr, on_polling_callback *on_polling, void *polling_arg) { if(mod_mgr == NULL|| on_polling == NULL)return -1; struct polling_node *node = CALLOC(struct polling_node, 1); node->on_polling = on_polling; node->polling_arg = polling_arg; LL_APPEND(mod_mgr->node_list, node); return 0; } void module_manager_polling_dispatch(struct module_manager *mod_mgr) { if(mod_mgr==NULL)return; struct polling_node *node; LL_FOREACH(mod_mgr->node_list, node) { if (node->on_polling) { node->on_polling(mod_mgr, node->polling_arg); } } return; }