Plugin management support pm_session_dettach_me and pm_session_dettach_others

Add test cases for pm_session_dettach_me and pm_session_dettach_others
This commit is contained in:
luwenpeng
2022-08-03 19:46:43 +08:00
parent 50111e7cd0
commit 5c790085eb
26 changed files with 1348 additions and 382 deletions

View File

@@ -1,48 +1,49 @@
#include "deps/uthash/uthash.h"
#include "session_manager.h"
#include "plugin_manager_util.h"
#include "plugin_manager_config.h"
#include "plugin_manager_module.h"
#include <assert.h>
#include <errno.h>
#include "uthash/uthash.h"
#include "session_manager.h"
#include "plugin_manager_module.h"
/******************************************************************************
* CallBack Runtime (For Per Session)
******************************************************************************/
struct eventcb_runtime
struct callback_runtime
{
int skip;
void *cb_args;
enum session_event_type event;
fn_session_event_callback *cb;
fn_session_event_callback *event_cb;
fn_session_error_callback *error_cb;
};
struct session_plugin_ctx
{
int current_plugin_index;
int eventcb_num;
struct eventcb_runtime *eventcbs;
int callback_index;
int callback_num;
struct callback_runtime *callbacks;
};
/******************************************************************************
* CallBack Static (For Per Plugin Manager)
******************************************************************************/
struct eventcb_static
struct callback_static
{
enum session_event_type event;
fn_session_event_callback *cb;
fn_session_event_callback *event_cb;
fn_session_error_callback *error_cb;
};
struct plugin_manager_eventcb
{
char session_name[MAX_SESSION_NAME_LENGTH]; // key
int eventcb_num; // val size
struct eventcb_static *eventcbs; // val: dynamic array
int callback_num; // val size
struct callback_static *callbacks; // val: dynamic array
UT_hash_handle hh;
};
@@ -84,15 +85,16 @@ static struct session_plugin_ctx *plugin_manager_create_plugin_ctx(struct plugin
else
{
struct session_plugin_ctx *plug_ctx = safe_alloc(struct session_plugin_ctx, 1);
plug_ctx->eventcb_num = elem->eventcb_num;
plug_ctx->eventcbs = safe_alloc(struct eventcb_runtime, plug_ctx->eventcb_num);
plug_ctx->callback_num = elem->callback_num;
plug_ctx->callbacks = safe_alloc(struct callback_runtime, plug_ctx->callback_num);
for (int i = 0; i < plug_ctx->eventcb_num; i++)
for (int i = 0; i < plug_ctx->callback_num; i++)
{
plug_ctx->eventcbs[i].skip = 0;
plug_ctx->eventcbs[i].event = elem->eventcbs[i].event;
plug_ctx->eventcbs[i].cb = elem->eventcbs[i].cb;
plug_ctx->eventcbs[i].cb_args = NULL;
plug_ctx->callbacks[i].skip = 0;
plug_ctx->callbacks[i].event = elem->callbacks[i].event;
plug_ctx->callbacks[i].event_cb = elem->callbacks[i].event_cb;
plug_ctx->callbacks[i].error_cb = elem->callbacks[i].error_cb;
plug_ctx->callbacks[i].cb_args = NULL;
}
return plug_ctx;
@@ -103,7 +105,7 @@ static void plugin_manager_destory_plugin_ctx(struct session_plugin_ctx *plug_ct
{
if (plug_ctx)
{
safe_free(plug_ctx->eventcbs);
safe_free(plug_ctx->callbacks);
safe_free(plug_ctx);
}
}
@@ -112,26 +114,20 @@ static void plugin_manager_destory_plugin_ctx(struct session_plugin_ctx *plug_ct
* Tools for managing plugins
******************************************************************************/
static int plugin_manager_parse_plugins(struct plugin_manager *plug_mgr, const char *prefix, const char *file)
static int plugin_manager_parse_plugins(struct plugin_manager *plug_mgr, const char *file)
{
char plugin_inf[4096] = {0};
char line_buffer[4096] = {0};
if (strlen(prefix) <= 0)
{
plugin_manager_log(ERROR, "Invalid parameter, plugin config file prefix cannot be empty");
return -1;
}
if (strlen(file) <= 0)
{
plugin_manager_log(ERROR, "Invalid parameter, plugin config file name cannot be empty");
return -1;
}
strcat_prefix_and_file(prefix, file, plugin_inf, sizeof(plugin_inf));
FILE *fp = fopen(plugin_inf, "r");
FILE *fp = fopen(file, "r");
if (fp == NULL)
{
plugin_manager_log(ERROR, "can't open %s, %s", plugin_inf, strerror(errno));
plugin_manager_log(ERROR, "can't open %s, %s", file, strerror(errno));
return -1;
}
@@ -142,6 +138,7 @@ static int plugin_manager_parse_plugins(struct plugin_manager *plug_mgr, const c
memset(line_buffer, 0, sizeof(line_buffer));
continue;
}
line_buffer[strcspn(line_buffer, "\r\n")] = 0;
if (plug_mgr->used_config_num >= MAX_PLUGIN_NUM)
{
@@ -150,7 +147,7 @@ static int plugin_manager_parse_plugins(struct plugin_manager *plug_mgr, const c
}
struct plugin_manager_config *config = plugin_mangager_config_create();
if (plugin_mangager_config_parse(config, prefix, line_buffer) == -1)
if (plugin_mangager_config_parse(config, line_buffer) == -1)
{
plugin_mangager_config_destory(config);
goto err;
@@ -186,6 +183,8 @@ static int plugin_manager_open_plugins(struct plugin_manager *plug_mgr)
{
return -1;
}
plugin_manager_module_dump(module, config);
plug_mgr->modules[plug_mgr->used_module_num] = module;
plug_mgr->used_module_num++;
}
@@ -267,9 +266,9 @@ static void plugin_manager_deparse_plugins(struct plugin_manager *plug_mgr)
* Public API for managing plugins
******************************************************************************/
int plugin_manager_load(struct plugin_manager *plug_mgr, const char *prefix, const char *file)
int plugin_manager_load(struct plugin_manager *plug_mgr, const char *file)
{
if (plugin_manager_parse_plugins(plug_mgr, prefix, file) == -1)
if (plugin_manager_parse_plugins(plug_mgr, file) == -1)
{
return -1;
}
@@ -324,7 +323,7 @@ void plugin_manager_destory(struct plugin_manager *plug_mgr)
{
HASH_DEL(plug_mgr->evcb_htable, elem);
safe_free(elem->eventcbs);
safe_free(elem->callbacks);
safe_free(elem);
}
@@ -337,7 +336,7 @@ void plugin_manager_destory(struct plugin_manager *plug_mgr)
}
}
int plugin_manager_register(struct plugin_manager *plug_mgr, const char *session_name, enum session_event_type event, fn_session_event_callback *cb)
int plugin_manager_register(struct plugin_manager *plug_mgr, const char *session_name, enum session_event_type event, fn_session_event_callback *event_cb, fn_session_error_callback *error_cb)
{
if (strlen(session_name) <= 0)
{
@@ -351,23 +350,30 @@ int plugin_manager_register(struct plugin_manager *plug_mgr, const char *session
return -1;
}
if (cb == NULL)
if (event_cb == NULL)
{
plugin_manager_log(ERROR, "invalid parameter, the callback corresponding to the session name '%s' is null", session_name);
plugin_manager_log(ERROR, "invalid parameter, the event callback corresponding to the session name '%s' is null", session_name);
return -1;
}
if (error_cb == NULL)
{
plugin_manager_log(ERROR, "invalid parameter, the error callback corresponding to the session name '%s' is null", session_name);
return -1;
}
struct plugin_manager_eventcb *elem;
HASH_FIND_STR(plug_mgr->evcb_htable, session_name, elem);
// session_name exists, add a new cb to the end of the eventcbs dynamic array
// session_name exists, add a new cb to the end of the callbacks dynamic array
if (elem)
{
elem->eventcbs = (struct eventcb_static *)realloc(elem->eventcbs, (elem->eventcb_num + 1) * sizeof(struct eventcb_static));
elem->callbacks = (struct callback_static *)realloc(elem->callbacks, (elem->callback_num + 1) * sizeof(struct callback_static));
elem->eventcbs[elem->eventcb_num].event = event;
elem->eventcbs[elem->eventcb_num].cb = cb;
elem->callbacks[elem->callback_num].event = event;
elem->callbacks[elem->callback_num].event_cb = event_cb;
elem->callbacks[elem->callback_num].error_cb = error_cb;
elem->eventcb_num++;
elem->callback_num++;
}
// session_name does not exist, allocate a new node elem, and add elem to the hash table
else
@@ -375,12 +381,13 @@ int plugin_manager_register(struct plugin_manager *plug_mgr, const char *session
elem = safe_alloc(struct plugin_manager_eventcb, 1);
memcpy(elem->session_name, session_name, strlen(session_name));
elem->eventcbs = (struct eventcb_static *)realloc(elem->eventcbs, (elem->eventcb_num + 1) * sizeof(struct eventcb_static));
elem->callbacks = (struct callback_static *)realloc(elem->callbacks, (elem->callback_num + 1) * sizeof(struct callback_static));
elem->eventcbs[elem->eventcb_num].event = event;
elem->eventcbs[elem->eventcb_num].cb = cb;
elem->callbacks[elem->callback_num].event = event;
elem->callbacks[elem->callback_num].event_cb = event_cb;
elem->callbacks[elem->callback_num].error_cb = error_cb;
elem->eventcb_num++;
elem->callback_num++;
HASH_ADD_STR(plug_mgr->evcb_htable, session_name, elem);
}
@@ -401,6 +408,9 @@ void plugin_manager_dispatch(struct plugin_manager *plug_mgr, struct stellar_eve
assert(seesion);
assert(session_name);
char event_str_buffer[1024] = {0};
session_event_type_int2str(event_type, event_str_buffer, 1024);
// the same session may trigger multi times opening events
if (event_type & SESSION_EVENT_OPENING)
{
@@ -418,18 +428,20 @@ void plugin_manager_dispatch(struct plugin_manager *plug_mgr, struct stellar_eve
if (plug_ctx)
{
for (int i = 0; i < plug_ctx->eventcb_num; i++)
for (int i = 0; i < plug_ctx->callback_num; i++)
{
plug_ctx->current_plugin_index = i;
struct eventcb_runtime *runtime = &plug_ctx->eventcbs[i];
if (runtime->skip)
struct callback_runtime *runtime = &plug_ctx->callbacks[i];
if (runtime->skip == 1)
{
plugin_manager_log(DEBUG, "dispatch, skip event_cb: %p, session: %s, event: (%d, %s)", runtime->event_cb, session_name, event_type, event_str_buffer);
continue;
}
if (runtime->event & event_type)
{
runtime->cb(seesion, event_type, packet, payload, payload_len, &runtime->cb_args);
plug_ctx->callback_index = i;
plugin_manager_log(DEBUG, "dispatch, run event_cb: %p, session: %s, event: (%d, %s)", runtime->event_cb, session_name, event_type, event_str_buffer);
runtime->event_cb(seesion, event_type, packet, payload, payload_len, &runtime->cb_args);
}
}
}
@@ -446,6 +458,55 @@ void plugin_manager_dispatch(struct plugin_manager *plug_mgr, struct stellar_eve
}
}
/******************************************************************************
* Public API For Plugin
******************************************************************************/
void pm_session_dettach_me(const struct stellar_session *session)
{
struct session_plugin_ctx *plugin_ctx = stellar_session_get_plugin_ctx(session);
assert(plugin_ctx);
struct callback_runtime *runtime_me = &plugin_ctx->callbacks[plugin_ctx->callback_index];
/*
* Just set the skip flag and don't call this event callback next.
* The plugin is closed before calling pm_session_dettach_me.
*/
runtime_me->skip = 1;
plugin_manager_log(DEBUG, "%p dettach me, disable event_cb: %p, session: %s", runtime_me->event_cb, runtime_me->event_cb, stellar_session_get_name(session));
}
void pm_session_dettach_others(const struct stellar_session *session)
{
struct session_plugin_ctx *plugin_ctx = stellar_session_get_plugin_ctx(session);
assert(plugin_ctx);
struct callback_runtime *runtime_me = &plugin_ctx->callbacks[plugin_ctx->callback_index];
for (int i = 0; i < plugin_ctx->callback_num; i++)
{
if (i != plugin_ctx->callback_index)
{
struct callback_runtime *runtime_other = &plugin_ctx->callbacks[i];
runtime_other->skip = 1;
plugin_manager_log(DEBUG, "%p dettach others, run error_cb: %p, session: %s", runtime_me->event_cb, runtime_other->error_cb, stellar_session_get_name(session));
runtime_other->error_cb(session, ERROR_EVENT_DETTACH, &runtime_other->cb_args);
}
}
}
/******************************************************************************
* Util For Gtest
******************************************************************************/
void *pm_session_get_plugin_pme(const struct stellar_session *session)
{
struct session_plugin_ctx *plugin_ctx = stellar_session_get_plugin_ctx(session);
assert(plugin_ctx);
struct callback_runtime *runtime_me = &plugin_ctx->callbacks[plugin_ctx->callback_index];
return runtime_me->cb_args;
}
/******************************************************************************
* Suppport LUA plugins
******************************************************************************/