【新增】插件运行数据统计

【修改】允许插件注册空的message topic
This commit is contained in:
niubinghui
2024-08-29 18:20:10 +08:00
parent ac857cc80a
commit 59b8dfa2a2
5 changed files with 231 additions and 113 deletions

View File

@@ -36,4 +36,6 @@ struct lua_plugin_manage_schema;
struct lua_plugin_manage_schema *lua_plugin_manage_init(struct stellar *st, int specific_count, struct lua_config_specific *specifics);
int lua_plugin_manage_load_one_specific(struct lua_plugin_manage_schema *schema, struct lua_config_specific *specific);
void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr);
void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr);
void lua_plugin_get_statistics(int plugin_id, int thread_id, int *new_success, int *new_fail, int *free_success, int *free_fail);

View File

@@ -28,12 +28,17 @@
* int lua_mq_publish_message
* int lua_mq_ignore_message
* int lua_mq_unignore_message
*
* 08-29
* 修改create_topic逻辑, 允许创建一个空的topic
************************************************************************/
#include "lua_plugin_manage_internal.h"
#include "stellar/session.h"
#include "stellar/session_mq.h"
int global_max_plugin_id = 0;
/* ***** ***** ***** ***** ***** ***** */
int lua_plugin_manage_regist(lua_State *state)
{
@@ -123,6 +128,9 @@ int lua_plugin_manage_regist(lua_State *state)
return 1;
}
}
/* 统计记录一下当前最大的plugin_id编号 */
if (plugin_id > global_max_plugin_id)
global_max_plugin_id = plugin_id;
/* 将注册完成的新插件插入到队列中 */
struct lua_plugin new_plugin;
@@ -132,6 +140,7 @@ int lua_plugin_manage_regist(lua_State *state)
new_plugin.ctx_free_ref = ctx_free_id;
utarray_push_back(plugin_env->plugin_array, &new_plugin);
plugin_env->plugin_count += 1;
global_schema->plugin_count += 1;
lua_settop(state, 0);
lua_pushinteger(state, plugin_id);
@@ -265,7 +274,8 @@ int lua_mq_create_topic(lua_State *state)
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION ||
if (((lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION) &&
(lua_type(state, -1) != LUA_TNIL || lua_type(state, -2) != LUA_TNIL)) ||
lua_type(state, -3) != LUA_TSTRING || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
@@ -273,20 +283,28 @@ int lua_mq_create_topic(lua_State *state)
}
/* 创建对该table的引用, 防止该table被回收 */
int private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */
if (private_ref == LUA_REFNIL)
int private_ref = 0;
if (lua_type(state, -1) != LUA_TNIL)
{
lua_settop(state, 0);
return 0;
private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */
if (private_ref == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
}
/* 导出free message的调用函数 */
int free_ref = luaL_ref(state, LUA_REGISTRYINDEX);
if (free_ref == LUA_REFNIL)
int free_ref = 0;
if (lua_type(state, -1) != LUA_TFUNCTION)
{
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
lua_settop(state, 0);
return 0;
free_ref = luaL_ref(state, LUA_REGISTRYINDEX);
if (free_ref == LUA_REFNIL)
{
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
lua_settop(state, 0);
return 0;
}
}
/* 出栈队列名称 */
@@ -307,29 +325,33 @@ int lua_mq_create_topic(lua_State *state)
lua_settop(state, 0);
/* 插入新的元素 */
struct lua_message_mq new_mq;
memset(&new_mq, 0, sizeof(new_mq));
utarray_push_back(global_schema->message_mq_array, &new_mq);
/* 从队列尾部取出最后一个元素 */
struct lua_message_mq *mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1));
mq->freemessage_ref = free_ref;
mq->mq_private_ref = private_ref;
/* 调用stellar中mq的topic创建函数 */
/* BugFix: 仔细看了代码, 在stellar中没有对name做拷贝处理, 这里name不能释放 */
int topic_id = stellar_session_mq_create_topic(st, (const char *)name, lpm_message_free_func, mq);
/* 创建topic失败, 还原创建topic之前的状态 */
if (topic_id < 0)
int topic_id = -1;
if (private_ref && free_ref)
{
utarray_pop_back(global_schema->message_mq_array);
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
if (name)
free(name);
return 0;
}
global_schema->mq_count += 1;
struct lua_message_mq new_mq;
memset(&new_mq, 0, sizeof(new_mq));
utarray_push_back(global_schema->message_mq_array, &new_mq);
/* 从队列尾部取出最后一个元素 */
struct lua_message_mq *mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1));
mq->freemessage_ref = free_ref;
mq->mq_private_ref = private_ref;
/* 调用stellar中mq的topic创建函数 */
/* BugFix: 仔细看了代码, 在stellar中没有对name做拷贝处理, 这里name不能释放 */
topic_id = stellar_session_mq_create_topic(st, (const char *)name, lpm_message_free_func, mq);
lua_rawgeti(state, LUA_REGISTRYINDEX, mq->mq_private_ref);
/* 创建topic失败, 还原创建topic之前的状态 */
if (topic_id < 0)
{
utarray_pop_back(global_schema->message_mq_array);
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
if (name)
free(name);
return 0;
}
global_schema->mq_count += 1;
lua_rawgeti(state, LUA_REGISTRYINDEX, mq->mq_private_ref);
#if 0
/* 不检查了, 直接覆盖 */
/* 在传入的table中检查是否存在与mq_private_env同名的元素 */
@@ -340,10 +362,22 @@ int lua_mq_create_topic(lua_State *state)
}
lua_pop(state, 1); /* stack top table */
#endif
/* 在该table中加入新元素 */
lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */
lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */
mq->topic_id = topic_id;
/* 在该table中加入新元素 */
lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */
lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */
mq->topic_id = topic_id;
}
else
{
/* 只是创建一个新的空topic */
topic_id = stellar_session_mq_create_topic(st, (const char *)name, NULL, NULL);
if (topic_id < 0)
{
if (name)
free(name);
return 0;
}
}
lua_settop(state, 0);
lua_pushinteger(state, topic_id);
@@ -695,9 +729,10 @@ int lua_mq_ignore_message(lua_State *state)
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
@@ -734,9 +769,10 @@ int lua_mq_unignore_message(lua_State *state)
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
@@ -769,9 +805,10 @@ int lua_mq_topic_is_active(lua_State *state)
/* 倒序获取参数 */
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}

View File

@@ -12,10 +12,10 @@
* 1. 实现函数
* void *lpm_ctx_new_func;
* void lpm_ctx_free_func;
*
*
* 08-13
* 1. 由于context结构体修改, 部分函数调用及处理逻辑需要同步修改
*
*
* 08-15
* 1. 实现函数
* void lpm_message_free_func
@@ -25,6 +25,10 @@
#include "stellar/session.h"
/* 内存分配过程中可能存在内存浪费的情况, 其他即使不是Lua的插件也分配了内存 */
/* 但是从调用的角度来说能够提高访问速度, 不需要做ID映射, 浪费内存也是以B为单位 */
struct lua_plugin_statistics *global_plugin_statistics = NULL;
/*
* Function: lpm_ctx_new_func
* Input: | struct session * | sess | 会话信息
@@ -66,6 +70,8 @@ void *lpm_ctx_new_func(
/* 创建新的context失败 */
return NULL;
int statisitc_id = thread_id * global_max_plugin_id + plugin_id;
struct lua_cdata param[3] = {0};
param[0].cdata_type = DATATYPE_POINTER;
param[0].cdata_pointer = sess;
@@ -77,9 +83,11 @@ void *lpm_ctx_new_func(
if (lua_chunk_execute(state, plugin->ctx_new_ref, 3, param, 0, NULL))
{
/* 脚本执行失败 */
++global_plugin_statistics[statisitc_id].new_failed_count;
lua_context_free(state, new_context);
return NULL;
}
++global_plugin_statistics[statisitc_id].new_success_count;
return (void *)new_context;
}
@@ -120,6 +128,8 @@ void lpm_ctx_free_func(
return;
lua_State *state = global_schema->thread_state[thread_id];
int statistic_id = thread_id * global_max_plugin_id + plugin_id;
struct lua_cdata param[3] = {0};
param[0].cdata_type = DATATYPE_POINTER;
param[0].cdata_pointer = sess;
@@ -128,7 +138,10 @@ void lpm_ctx_free_func(
param[2].cdata_type = DATATYPE_TABLE;
param[2].cdata_table = env->private_env_ref;
lua_chunk_execute(state, plugin->ctx_free_ref, 3, param, 0, NULL);
if (lua_chunk_execute(state, plugin->ctx_free_ref, 3, param, 0, NULL))
++global_plugin_statistics[statistic_id].free_failed_count;
else
++global_plugin_statistics[statistic_id].free_success_count;
lua_context_free(state, (struct lua_context *)sess_ctx);
return;

View File

@@ -29,7 +29,7 @@
* 08-06
* 1. 实现函数
* int script_execute;
*
*
* 08-13
* 1. 修改部分创建列表使用的结构
* 2. 实现函数
@@ -43,7 +43,7 @@
#include <string.h>
#include <unistd.h>
struct lua_plugin_manage_schema * global_schema = NULL;
struct lua_plugin_manage_schema *global_schema = NULL;
#if 0
void lua_message_mq_destory(void * elt)
@@ -59,9 +59,10 @@ static UT_icd lua_message_mq_icd = {sizeof(struct lua_message_mq), NULL, NULL, N
void lua_plugin_destory(void *elt)
{
if ( !elt ) return;
struct lua_plugin * plugin = (struct lua_plugin *)elt;
if ( plugin->sub_topic_array )
if (!elt)
return;
struct lua_plugin *plugin = (struct lua_plugin *)elt;
if (plugin->sub_topic_array)
utarray_free(plugin->sub_topic_array);
return;
}
@@ -74,14 +75,16 @@ static UT_icd lua_plugin_icd = {sizeof(struct lua_plugin), NULL, NULL, lua_plugi
* Return: 查找得到的plugin结构
* Description: 在global schema中根据plugin_id查找一个plugin
*/
struct lua_plugin * search_plugin_by_id(int plugin_id)
struct lua_plugin *search_plugin_by_id(int plugin_id)
{
for ( int i = 0; i < global_schema->model_count; ++i) {
struct lua_model * model = &global_schema->model[i];
for (int i = 0; i < global_schema->model_count; ++i)
{
struct lua_model *model = &global_schema->model[i];
struct lua_plugin * plugin = NULL;
while ( (plugin = utarray_next(model->plugin_array, plugin)) ) {
if ( plugin->plugin_id == plugin_id )
struct lua_plugin *plugin = NULL;
while ((plugin = utarray_next(model->plugin_array, plugin)))
{
if (plugin->plugin_id == plugin_id)
return plugin;
else if (plugin->plugin_id > plugin_id)
return NULL;
@@ -97,11 +100,12 @@ struct lua_plugin * search_plugin_by_id(int plugin_id)
* Return: 查找得到的message_mq结构
* Description: 在global schema中根据topic_id查找一个message_mq
*/
struct lua_message_mq * search_message_mq_by_id(int topic_id)
struct lua_message_mq *search_message_mq_by_id(int topic_id)
{
struct lua_message_mq * mq = NULL;
while ((mq = utarray_next(global_schema->message_mq_array, mq))) {
if ( mq->topic_id == topic_id )
struct lua_message_mq *mq = NULL;
while ((mq = utarray_next(global_schema->message_mq_array, mq)))
{
if (mq->topic_id == topic_id)
return mq;
}
return NULL;
@@ -339,7 +343,7 @@ struct lua_plugin_manage_schema *lua_plugin_manage_init(
/* 为将要加载的模块预分配内存 */
new_schema->model_count = specific_count;
/* 如果没有配置, 也应该算是创建成功 */
if ( specific_count == 0 )
if (specific_count == 0)
return new_schema;
new_schema->model = (struct lua_model *)calloc(specific_count, sizeof(struct lua_model));
if (__glibc_unlikely(!new_schema->model))
@@ -389,6 +393,16 @@ struct lua_plugin_manage_schema *lua_plugin_manage_init(
#ifdef LUAPLUGIN_BASIC_UNITTEST
debug_lua_plugin_manage_schema(new_schema);
#endif
/* 初始化全局状态统计 */
global_plugin_statistics = (struct lua_plugin_statistics *)calloc((thread_count * global_max_plugin_id), sizeof(struct lua_plugin_statistics));
if (!global_plugin_statistics)
{
LOGERROR("create plugin statistics failed");
lua_plugin_manage_exit(new_schema);
return NULL;
}
return new_schema;
}
@@ -406,7 +420,8 @@ int lua_plugin_manage_load_one_specific(
if (__glibc_unlikely(!schema || !specific))
return -1;
if ( schema->model ) {
if (schema->model)
{
schema->model = (struct lua_model *)realloc(schema->model, (schema->model_count + 1) * sizeof(struct lua_model));
schema->model_count += 1;
}
@@ -434,6 +449,17 @@ int lua_plugin_manage_load_one_specific(
return -1;
}
}
if (global_plugin_statistics)
free(global_plugin_statistics);
/* 初始化全局状态统计 */
global_plugin_statistics = (struct lua_plugin_statistics *)calloc((schema->state_count * global_max_plugin_id), sizeof(struct lua_plugin_statistics));
if (!global_plugin_statistics)
{
LOGERROR("create plugin statistics failed");
return -1;
}
return 0;
}
@@ -457,7 +483,8 @@ void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr)
/* 在状态机中对每一个模块调用对应的卸载函数 */
int call_unload_ret = thread_state_call_unload(lua_plug_mgr->thread_state[state_index],
&lua_plug_mgr->model[model_index]);
if ( call_unload_ret ) {
if (call_unload_ret)
{
LOGERROR("call state unload function failed, ret is %d", call_unload_ret);
}
}
@@ -473,72 +500,102 @@ void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr)
}
free(lua_plug_mgr->model);
if ( lua_plug_mgr->message_mq_array )
if (lua_plug_mgr->message_mq_array)
utarray_free(lua_plug_mgr->message_mq_array);
free(lua_plug_mgr);
return;
}
/*
* Function:
* Input:
* Output:
* Return:
* Description: 获取某插件的运行成功及失败次数
*/
void lua_plugin_get_statistics(int plugin_id, int thread_id, int *new_success, int *new_fail, int *free_success, int *free_fail)
{
int statistic_id = thread_id * global_max_plugin_id + plugin_id;
if (new_success)
*new_success = global_plugin_statistics[statistic_id].new_success_count;
if (new_fail)
*new_fail = global_plugin_statistics[statistic_id].new_failed_count;
if (free_success)
*free_success = global_plugin_statistics[statistic_id].free_success_count;
if (free_fail)
*free_fail = global_plugin_statistics[statistic_id].free_failed_count;
return;
}
#ifdef LUAPLUGIN_BASIC_UNITTEST
void debug_lua_state_stack(lua_State * state, int mod, const char * message)
void debug_lua_state_stack(lua_State *state, int mod, const char *message)
{
int stackcount = lua_gettop(state);
/* nothing in stack */
if ( !stackcount ) {
/* nothing in stack */
if (!stackcount)
{
printf("debug stack, but stack is empty");
return;
}
printf("\n***** begin to debug one lua stack *****\n");
if ( message ) printf("debug here: %s\n", message);
int i = stackcount;
/* print variables one by one */
for ( ; i > 0; --i ) {
/* adjust variables according to mod */
if ( mod ) i = 0 - i;
int type = lua_type(state, i);
printf("stack[%d]: ", i);
switch (type) {
case LUA_TBOOLEAN:
printf(lua_toboolean(state, i) ? "true\n" : "false\n");
break;
case LUA_TNUMBER:
printf("%g\n", lua_tonumber(state, i));
break;
case LUA_TSTRING:
printf("%s\n", lua_tostring(state, i));
break;
default:
printf("this is not normal type, type is: %d[%s]\n", type, lua_typename(state, type));
break;
}
/* adjust variables according to mod */
if ( mod ) i = 0 - i;
}
printf("***** end of debug one lua stack *****\n\n");
printf("\n***** begin to debug one lua stack *****\n");
if (message)
printf("debug here: %s\n", message);
int i = stackcount;
/* print variables one by one */
for (; i > 0; --i)
{
/* adjust variables according to mod */
if (mod)
i = 0 - i;
int type = lua_type(state, i);
printf("stack[%d]: ", i);
switch (type)
{
case LUA_TBOOLEAN:
printf(lua_toboolean(state, i) ? "true\n" : "false\n");
break;
case LUA_TNUMBER:
printf("%g\n", lua_tonumber(state, i));
break;
case LUA_TSTRING:
printf("%s\n", lua_tostring(state, i));
break;
default:
printf("this is not normal type, type is: %d[%s]\n", type, lua_typename(state, type));
break;
}
/* adjust variables according to mod */
if (mod)
i = 0 - i;
}
printf("***** end of debug one lua stack *****\n\n");
}
void debug_lua_plugin_manage_schema(struct lua_plugin_manage_schema * schema)
void debug_lua_plugin_manage_schema(struct lua_plugin_manage_schema *schema)
{
printf("\n***** begin to debug one lua schema *****\n");
printf("schema.st is %p\n", schema->st);
printf("schema state count is %d\n", schema->state_count);
for ( int i = 0; i < schema->state_count; ++i ) {
for (int i = 0; i < schema->state_count; ++i)
{
printf("schema state[%d]pointer is %p\n", i, schema->thread_state[i]);
}
printf("schema model count is %d\n", schema->model_count);
for ( int i = 0; i < schema->model_count; ++i ) {
for (int i = 0; i < schema->model_count; ++i)
{
printf("debug model[%d]\n", i);
printf("array %p, load %d, unload %d, env %d, mark %04x, count %04x\n",
schema->model[i].plugin_array, schema->model[i].load_ref, schema->model[i].unload_ref,
schema->model[i].private_env_ref, schema->model[i].model_mark, schema->model[i].plugin_count);
struct lua_plugin * plugin = NULL;
while ((plugin = utarray_next(schema->model[i].plugin_array, plugin))) {
schema->model[i].plugin_array, schema->model[i].load_ref, schema->model[i].unload_ref,
schema->model[i].private_env_ref, schema->model[i].model_mark, schema->model[i].plugin_count);
struct lua_plugin *plugin = NULL;
while ((plugin = utarray_next(schema->model[i].plugin_array, plugin)))
{
printf("%d, %d, %d\n", plugin->plugin_id, plugin->ctx_new_ref, plugin->ctx_free_ref);
}
}
printf("***** end of debug one lua schema *****\n\n");
printf("***** end of debug one lua schema *****\n\n");
}
#endif

View File

@@ -249,9 +249,6 @@ void lpm_on_session_msg_func(struct session *sess, int topic_id, const void *msg
int lua_chunk_execute(lua_State *state, int ref_id, int pcount, struct lua_cdata *params, int rcount, struct lua_cdata *returns);
/* ***** ***** ***** ***** ***** ***** */
/* 状态机相关的一些数据结构及操作, 实现在lua_plugin_manage.c中 */
extern struct lua_plugin_manage_schema *global_schema;
/* TODO:统计插件的运行情况, 暂时没想好怎么用 */
/* 记录一个插件的运行状态 */
struct lua_plugin_statistics
@@ -265,6 +262,13 @@ struct lua_plugin_statistics
/* ctx_free函数调用失败的次数 */
int free_failed_count;
};
/* 状态机相关的一些数据结构及操作, 实现在lua_plugin_manage.c中 */
extern struct lua_plugin_manage_schema *global_schema;
/* 保存插件的运行情况, 运行次数等信息 */
/* 二维数组修改为一维数组, 方便使用偏移快速查找 */
extern struct lua_plugin_statistics *global_plugin_statistics;
/* 最大插件编号 */
extern int global_max_plugin_id;
#define LUA_MQ_ENV_DEFAULT_KEY "__mqenv_pointer"
#define LUA_MQ_TOPIC_ID_KEY "topic_id"
@@ -290,6 +294,7 @@ struct lua_plugin
UT_array *sub_topic_array;
};
void lua_plugin_destory(void *elt);
/* 根据ID号在整个schema中进行遍历 */
struct lua_plugin *search_plugin_by_id(int plugin_id);
#define MODEL_MARK_INIT_DONE 0x0001
@@ -336,13 +341,17 @@ struct lua_plugin_manage_schema
/* 创建的状态机数量, 状态机数量与线程的个数相同 */
int state_count;
lua_State **thread_state;
/* */
/* 插入模块的数量, 模块的数量与specific的数量相同 */
int model_count;
struct lua_model *model;
/* 注册的消息队列的数量 */
int mq_count;
/* 所有模块中注册的插件总数量 */
int plugin_count;
/* 线程状态机 */
lua_State **thread_state;
/* 所有插入的模块 */
struct lua_model *model;
/* TODO: 创建的所有message topic id理论上应该是连续的, 可以考虑用hash数组, 寻址能更快 */
UT_array *message_mq_array;
};