From 3b2baf28064a1fcaa9f193bd9b8f13c9c6f98af2 Mon Sep 17 00:00:00 2001 From: niubinghui Date: Thu, 15 Aug 2024 17:39:28 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E6=96=B0=E5=A2=9E=E3=80=91=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0message=E5=A4=84=E7=90=86=E7=9B=B8=E5=85=B3lua?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/lua_plugin_manage.h | 3 + src/Makefile | 14 +- src/lua_binding_functions.c | 782 +++++++++++++++++++++++++++++++ src/lua_binding_functions.h | 52 ++ src/lua_plugin_binding.c | 228 +-------- src/lua_plugin_cfunc.c | 141 +++++- src/lua_plugin_manage.c | 78 ++- src/lua_plugin_manage_internal.h | 56 ++- 8 files changed, 1118 insertions(+), 236 deletions(-) create mode 100644 src/lua_binding_functions.c create mode 100644 src/lua_binding_functions.h diff --git a/include/lua_plugin_manage.h b/include/lua_plugin_manage.h index 3f19788..d7fc073 100644 --- a/include/lua_plugin_manage.h +++ b/include/lua_plugin_manage.h @@ -14,6 +14,9 @@ * lua_plugin_manage_exit * 2. 声明数据结构 * struct lua_plugin_manage_schema + * + * 08-09 + * 1. 修改参数函数原型, 传入参数修改为已经加载的配置信息 ************************************************************************/ #pragma once diff --git a/src/Makefile b/src/Makefile index dc6897e..6249dc5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -5,16 +5,18 @@ TARGET=libluaplugin.so TEST_FLAG = -DLUAPLUGIN_BASIC_UNITTEST -SRC := lua_plugin_binding.c \ - lua_plugin_data.c \ - lua_plugin_cfunc.c \ +SRC := lua_plugin_data.c \ lua_plugin_chunk.c \ + lua_plugin_cfunc.c \ + lua_binding_functions.c \ + lua_plugin_binding.c \ lua_plugin_manage.c -OBJECTS := lua_plugin_binding.o \ - lua_plugin_data.o \ - lua_plugin_cfunc.o \ +OBJECTS := lua_plugin_data.o \ lua_plugin_chunk.o \ + lua_plugin_cfunc.o \ + lua_binding_functions.o \ + lua_plugin_binding.o \ lua_plugin_manage.o INCLUDE = -I$(TOPDIR)/dependence/include -I$(TOPDIR)/include diff --git a/src/lua_binding_functions.c b/src/lua_binding_functions.c new file mode 100644 index 0000000..3d8f3ee --- /dev/null +++ b/src/lua_binding_functions.c @@ -0,0 +1,782 @@ +/************************************************************************* + > File Name: lua_binding_functions.c + > Author: + > Created Time: 2024-08 + > Encoding : UTF-8 + ************************************************************************/ + +/************************************************************************* + * 声明并定义所有需要在lua状态机中绑定的函数 + * version + * [ v0.1 ] + * 08-14 + * 1. 实现函数 + * 新增插件注册函数 + * int lua_plugin_manage_regist + * 新增会话相关函数 + * int lua_session_get_id + * int lua_session_set_id + * int lua_session_get_type + * int lua_session_set_type + * 新增message相关函数 + * int lua_mq_create_topic + * int lua_mq_get_topic_id + * int lua_mq_update_topic + * int lua_mq_destory_topic + * int lua_mq_subscribe_topic + * int lua_mq_topic_is_active + * int lua_mq_publish_message + * int lua_mq_ignore_message + * int lua_mq_unignore_message + ************************************************************************/ +#include "lua_plugin_manage_internal.h" +#include "session_mq.h" + +/* ***** ***** ***** ***** ***** ***** */ +int lua_plugin_manage_regist(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION || + lua_type(state, -3) != LUA_TFUNCTION || lua_type(state, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 取出处理第四个参数 */ + lua_getfield(state, -1, LUA_PLUGIN_ENV_DEFAULT_KEY); /* stack 4, table中取出对应结构的指针 */ + if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + struct lua_model *plugin_env = (struct lua_model *)lua_topointer(state, -1); + lua_pop(state, 2); + // debug_lua_state_stack(state, 0, "here"); + // printf("env pointer is %p\n", plugin_env); + + /* 取出处理第三个参数 */ + int ctx_free_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 3 */ + if (ctx_free_id == LUA_REFNIL) + { + lua_settop(state, 0); + return 0; + } + + /* 取出处理第二个参数 */ + int ctx_new_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 2 */ + if (ctx_new_id == LUA_REFNIL) + { + luaL_unref(state, LUA_REGISTRYINDEX, ctx_free_id); + lua_settop(state, 0); + return 0; + } + + /* 取出处理第一个参数 */ + struct stellar *st = (struct stellar *)lua_topointer(state, -1); /* stack 1 */ + if (!st) + { + luaL_unref(state, LUA_REGISTRYINDEX, ctx_new_id); + luaL_unref(state, LUA_REGISTRYINDEX, ctx_free_id); + lua_settop(state, 0); + return 0; + } + lua_pop(state, 1); + + /* 在stellar中注册, 获取注册id */ + int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)plugin_env); +#ifdef LUAPLUGIN_BASIC_UNITTEST + LOGDEBUG("now regist new plugin, plugin id is %d, %d, %d\n", plugin_id, ctx_new_id, ctx_free_id); +#endif + + /* TODO: 如果运行完全符合预期的话, 理论上仅有thread 0在此处需要插入新的插件, 且不应该有错误 + * 对于其他线程这里应该直接检查ref id是否一致即可, 按理说不应该再插入新插件 + * 后续可以修改为根据线程号执行不同的处理流程 + */ + /* 如果在其他线程中已经完成过注册 */ + struct lua_plugin *search_plugin = NULL; + while ((search_plugin = utarray_next(plugin_env->plugin_array, search_plugin))) + { + if (search_plugin->plugin_id == plugin_id) + { + /* 初始化过程中已经进行过加载 */ + if (search_plugin->ctx_new_ref != ctx_new_id || search_plugin->ctx_free_ref != ctx_free_id) + { + LOGERROR("regist plugin, same id with different function ref"); + LOGERROR("plugin id %d, registed %d, %d, new ref %d, %d", plugin_id, + search_plugin->ctx_new_ref, search_plugin->ctx_free_ref, + ctx_new_id, ctx_free_id); + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + lua_pushinteger(state, plugin_id); + return 1; + } + } + + /* 将注册完成的新插件插入到队列中 */ + struct lua_plugin new_plugin; + memset(&new_plugin, 0, sizeof(new_plugin)); + new_plugin.plugin_id = plugin_id; + new_plugin.ctx_new_ref = ctx_new_id; + new_plugin.ctx_free_ref = ctx_free_id; + utarray_push_back(plugin_env->plugin_array, &new_plugin); + plugin_env->plugin_count += 1; + + lua_settop(state, 0); + lua_pushinteger(state, plugin_id); + + return 1; +} + +/* ***** ***** ***** ***** ***** ***** */ +int lua_session_get_id(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 1) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + struct session *sess = (struct session *)lua_topointer(state, -1); + if (!sess) + { + lua_settop(state, 0); + return 0; + } + lua_pop(state, 1); + + lua_pushinteger(state, session_get_id(sess)); + return 1; +} + +int lua_session_set_id(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 2) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + int setid = lua_tointeger(state, -1); + lua_pop(state, 1); + struct session *sess = (struct session *)lua_topointer(state, -1); + lua_pop(state, 1); + + session_set_id(sess, setid); + return 0; +} + +int lua_session_get_type(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 1) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + struct session *sess = (struct session *)lua_topointer(state, -1); + if (!sess) + { + lua_settop(state, 0); + return 0; + } + lua_pop(state, 1); + + lua_pushinteger(state, session_get_type(sess)); + return 1; +} + +int lua_session_set_type(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 2) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + int settype = lua_tointeger(state, -1); + lua_pop(state, 1); + struct session *sess = (struct session *)lua_topointer(state, -1); + lua_pop(state, 1); + + session_set_id(sess, settype); + return 0; +} + +/* ***** ***** ***** ***** ***** ***** */ +/* + * TODO: 未完整考虑线程安全问题, 例如 + * 多个线程同时注册一个topic, 是否需要做处理等。 + */ +static UT_icd lua_plugin_mq_icd = {sizeof(struct lua_plugin_mq), NULL, NULL, NULL}; + +int lua_mq_create_topic(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION || + lua_type(state, -3) != LUA_TSTRING || lua_type(state, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 创建对该table的引用, 防止该table被回收 */ + int private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */ + if (private_ref == LUA_REFNIL) + { + lua_settop(state, 0); + return 0; + } + + /* 导出free message的调用函数 */ + int free_ref = luaL_ref(state, LUA_REGISTRYINDEX); + if (free_ref == LUA_REFNIL) + { + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + lua_settop(state, 0); + return 0; + } + + /* 出栈队列名称 */ + char *name = strdup((char *)lua_tostring(state, -1)); + lua_pop(state, 1); + + /* 出栈传入的stellar */ + struct stellar *st = (struct stellar *)lua_topointer(state, -1); + if (!st) + { + luaL_unref(state, LUA_REGISTRYINDEX, free_ref); + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + if (name) + free(name); + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + /* 插入新的元素 */ + struct lua_message_mq new_mq; + memset(&new_mq, 0, sizeof(new_mq)); + utarray_push_back(global_schema->message_mq_array, &new_mq); + /* 从队列尾部取出最后一个元素 */ + struct lua_message_mq *mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1)); + mq->freemessage_ref = free_ref; + mq->mq_private_ref = private_ref; + /* 调用stellar中mq的topic创建函数 */ + /* BugFix: 仔细看了代码, 在stellar中没有对name做拷贝处理, 这里name不能释放 */ + int topic_id = stellar_session_mq_create_topic(st, (const char *)name, lpm_message_free_func, mq); + /* 创建topic失败, 还原创建topic之前的状态 */ + if (topic_id < 0) + { + utarray_pop_back(global_schema->message_mq_array); + luaL_unref(state, LUA_REGISTRYINDEX, free_ref); + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + if (name) + free(name); + return 0; + } + + lua_rawgeti(state, LUA_REGISTRYINDEX, mq->mq_private_ref); +#if 0 + /* 不检查了, 直接覆盖 */ + /* 在传入的table中检查是否存在与mq_private_env同名的元素 */ + lua_getfield(state, -1, LUA_MQ_ENV_DEFAULT_KEY); /* stack top nil */ + if (lua_type(state, -1) != LUA_TNIL) { + lua_settop(state, 0); + return 0; + } + lua_pop(state, 1); /* stack top table */ +#endif + /* 在该table中加入新元素 */ + lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */ + lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */ + mq->topic_id = topic_id; + + lua_settop(state, 0); + lua_pushinteger(state, topic_id); + + return 1; +} + +int lua_mq_get_topic_id(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 2) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TSTRING || lua_type(state, -2) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 出栈队列名称 */ + char *name = strdup(lua_tostring(state, -1)); + lua_pop(state, 1); + + /* 出栈传入的stellar */ + struct stellar *st = (struct stellar *)lua_topointer(state, -1); + if (!st) + { + if (name) + free(name); + return 0; + } + lua_pop(state, 1); + + int topic_id = stellar_session_mq_get_topic_id(st, (const char *)name); + if (name) + free(name); + + lua_settop(state, 0); + lua_pushinteger(state, topic_id); + return 1; +} + +int lua_mq_update_topic(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION || + lua_type(state, -3) != LUA_TNUMBER || lua_type(state, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 创建对该table的引用, 防止该table被回收 */ + int private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */ + if (private_ref == LUA_REFNIL) + { + lua_settop(state, 0); + return 0; + } + + /* 导出free message的调用函数 */ + int free_ref = luaL_ref(state, LUA_REGISTRYINDEX); + if (free_ref == LUA_REFNIL) + { + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + lua_settop(state, 0); + return 0; + } + + /* topic_id */ + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + /* 出栈传入的stellar */ + struct stellar *st = (struct stellar *)lua_topointer(state, -1); + if (!st) + { + luaL_unref(state, LUA_REGISTRYINDEX, free_ref); + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + struct lua_message_mq *mq = search_message_mq_by_id(topic_id); + if (!mq || mq->topic_id != topic_id) + { + /* 如果topic不是lua创建的, 需要加入管理 */ + struct lua_message_mq new_mq; + memset(&new_mq, 0, sizeof(new_mq)); + utarray_push_back(global_schema->message_mq_array, &new_mq); + mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1)); + + if (stellar_session_mq_update_topic(st, topic_id, lpm_message_free_func, mq)) + { + utarray_pop_back(global_schema->message_mq_array); + luaL_unref(state, LUA_REGISTRYINDEX, free_ref); + luaL_unref(state, LUA_REGISTRYINDEX, private_ref); + return 0; + } + + mq->freemessage_ref = free_ref; + mq->mq_private_ref = private_ref; + mq->topic_id = topic_id; + lua_pushboolean(state, 1); + return 1; + } + else + { + /* 本身是由lua创建的 */ + /* 更新private_ref */ + lua_rawgeti(state, LUA_REGISTRYINDEX, private_ref); + lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */ + lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */ + luaL_unref(state, LUA_REGISTRYINDEX, mq->mq_private_ref); + mq->mq_private_ref = private_ref; + + /* 更新free function ref */ + luaL_unref(state, LUA_REGISTRYINDEX, mq->freemessage_ref); + mq->freemessage_ref = free_ref; + } + + lua_settop(state, 0); + lua_pushboolean(state, 1); + + return 1; +} + +int lua_mq_destory_topic(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 2) + { + lua_settop(state, 0); + return 0; + } + + /* topic_id */ + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + /* 出栈传入的stellar */ + struct stellar *st = (struct stellar *)lua_topointer(state, -1); + if (!st) + { + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + /* 优先调用C函数进行卸载 */ + if (stellar_session_mq_destroy_topic(st, topic_id) < 0) + { + return 0; + } + + /* 不方便删除, 将id置为-1, 确保匹配不到 */ + struct lua_message_mq *mq = search_message_mq_by_id(topic_id); + if (mq) + { + mq->topic_id = -1; + luaL_unref(state, LUA_REGISTRYINDEX, mq->mq_private_ref); + luaL_unref(state, LUA_REGISTRYINDEX, mq->freemessage_ref); + mq->freemessage_ref = 0; + mq->mq_private_ref = 0; + } + + lua_pushboolean(state, 1); + return 1; +} + +int lua_mq_subscribe_topic(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 2) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TFUNCTION || + lua_type(state, -3) != LUA_TNUMBER || lua_type(state, -4) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 读取参数 */ + int plugin_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + int on_message_ref = luaL_ref(state, -1); + if (on_message_ref == LUA_REFNIL) + { + lua_settop(state, 0); + return 0; + } + + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + struct stellar *st = (struct stellar *)lua_topointer(state, -1); + if (!st) + { + lua_settop(state, 0); + return 0; + } + lua_pop(state, 1); + + lua_settop(state, 0); + if (stellar_session_mq_subscribe(st, topic_id, lpm_on_session_msg_func, plugin_id)) + { + + /* 订阅失败, 返回false */ + lua_pushboolean(state, 0); + } + else + { + struct lua_plugin *plugin = search_plugin_by_id(plugin_id); + if (plugin) + { + if (!plugin->sub_topic_array) + { + /* 该插件尚未注册任何topic, 注册第一个topic时将创建接收的topic列表 */ + utarray_new(plugin->sub_topic_array, &lua_plugin_mq_icd); + } + else + { + /* 如果该插件中之前已经订阅过该消息, 更新message函数 */ + struct lua_plugin_mq *mq = NULL; + while ((mq = utarray_next(plugin->sub_topic_array, mq))) + { + if (mq->topic_id == topic_id) + { + luaL_unref(state, LUA_REGISTRYINDEX, mq->onmessage_ref); + mq->onmessage_ref = on_message_ref; + lua_pushboolean(state, 1); + return 1; + } + } + } + + struct lua_plugin_mq new_mq; + memset(&new_mq, 0, sizeof(new_mq)); + new_mq.topic_id = topic_id; + new_mq.onmessage_ref = on_message_ref; + utarray_push_back(plugin->sub_topic_array, &new_mq); + } + /* 订阅成功, 返回true */ + lua_pushboolean(state, 1); + } + + return 1; +} + +int lua_mq_publish_message(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 3) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TNUMBER || + lua_type(state, -3) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 倒序依次获取参数 */ + int mess_ref = luaL_ref(state, LUA_REGISTRYINDEX); + if (mess_ref == LUA_REFNIL) + { + lua_settop(state, 0); + return 0; + } + + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + struct session *sess = (struct session *)lua_topointer(state, -1); + if (!sess) + { + luaL_unref(state, LUA_REGISTRYINDEX, mess_ref); + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + /* 创建一段数据引用 */ + struct lua_context *new_context = (struct lua_context *)calloc(1, sizeof(struct lua_context)); + if (__glibc_unlikely(!new_context)) + { + luaL_unref(state, LUA_REGISTRYINDEX, mess_ref); + return 0; + } + new_context->context_ref_id = mess_ref; + + /* 调用C接口发布消息 */ + if (session_mq_publish_message(sess, topic_id, new_context)) + { + luaL_unref(state, LUA_REGISTRYINDEX, new_context->context_ref_id); + free(new_context); + lua_pushboolean(state, 0); + } + else + { + lua_pushboolean(state, 1); + } + + return 1; +} + +int lua_mq_ignore_message(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TNUMBER || + lua_type(state, -3) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 倒序获取参数 */ + int plugin_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + struct session * sess = (struct session *)lua_topointer(state, -1); + if ( !sess ) { + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + if (session_mq_ignore_message(sess, topic_id, plugin_id)) + lua_pushboolean(state, 0); + else + lua_pushboolean(state, 1); + + return 1; +} + +int lua_mq_unignore_message(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TNUMBER || + lua_type(state, -3) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 倒序获取参数 */ + int plugin_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + struct session * sess = (struct session *)lua_topointer(state, -1); + if ( !sess ) { + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + if (session_mq_unignore_message(sess, topic_id, plugin_id)) + lua_pushboolean(state, 0); + else + lua_pushboolean(state, 1); + + return 1; +} + +int lua_mq_topic_is_active(lua_State *state) +{ + /* 参数个数检查 */ + if (lua_gettop(state) != 4) + { + lua_settop(state, 0); + return 0; + } + + /* 参数类型检查 */ + if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA) + { + lua_settop(state, 0); + return 0; + } + + /* 倒序获取参数 */ + int topic_id = lua_tointeger(state, -1); + lua_pop(state, 1); + + struct session * sess = (struct session *)lua_topointer(state, -1); + if ( !sess ) { + lua_settop(state, 0); + return 0; + } + lua_settop(state, 0); + + /* 1 means active */ + if (session_mq_topic_is_active(sess, topic_id) == 1) + lua_pushboolean(state, 1); + else + lua_pushboolean(state, 0); + + return 1; +} \ No newline at end of file diff --git a/src/lua_binding_functions.h b/src/lua_binding_functions.h new file mode 100644 index 0000000..f051e2e --- /dev/null +++ b/src/lua_binding_functions.h @@ -0,0 +1,52 @@ +/************************************************************************* + > File Name: lua_binding_functions.h + > Author: + > Created Time: 2024-08 + > Encoding : UTF-8 + ************************************************************************/ + +/************************************************************************* + * 声明并定义所有需要在lua状态机中绑定的函数 + * version + * [ v0.1 ] + * 08-14 + * 1. 新增函数声明 + * 新增插件注册函数 + * int lua_plugin_manage_regist + * 新增会话相关函数 + * int lua_session_get_id + * int lua_session_set_id + * int lua_session_get_type + * int lua_session_set_type + * 新增message相关函数 + * int lua_mq_create_topic + * int lua_mq_get_topic_id + * int lua_mq_update_topic + * int lua_mq_destory_topic + * int lua_mq_subscribe_topic + * int lua_mq_topic_is_active + * int lua_mq_publish_message + * int lua_mq_ignore_message + * int lua_mq_unignore_message + ************************************************************************/ +#include "lua_plugin_manage_internal.h" + +/* 需要注册至lua中供lua调用的所有函数原型 */ +int lua_plugin_manage_regist(lua_State *state); + +/* 与struct session结构相关的函数 */ +int lua_session_get_id(lua_State *state); +int lua_session_set_id(lua_State *state); +int lua_session_get_type(lua_State *state); +int lua_session_set_type(lua_State *state); + +/* 与stellar message mq相关的函数 */ +int lua_mq_create_topic(lua_State * state); +int lua_mq_get_topic_id(lua_State * state); +int lua_mq_update_topic(lua_State * state); +int lua_mq_destory_topic(lua_State * state); +int lua_mq_subscribe_topic(lua_State * state); +int lua_mq_topic_is_active(lua_State * state); +int lua_mq_publish_message(lua_State * state); +int lua_mq_ignore_message(lua_State * state); +int lua_mq_unignore_message(lua_State * state); diff --git a/src/lua_plugin_binding.c b/src/lua_plugin_binding.c index f72cb43..a252a64 100644 --- a/src/lua_plugin_binding.c +++ b/src/lua_plugin_binding.c @@ -22,19 +22,16 @@ * 08-12 * 1. 修改函数lua_cbinding_function, 参数与lua_cbinding_data保持统一 * 2. 修改部分函数返回值, 使用枚举代替错误码返回值, 方便统一处理 + * + * 08-14 + * 1. 将所有待注册函数移动至新文件中, 不再此文件中实现 ************************************************************************/ #include "lua_plugin_manage_internal.h" +#include "lua_binding_functions.h" #include #include -/* 需要注册至lua中供lua调用的所有函数原型 */ -int lua_plugin_manage_regist(lua_State *state); -int lua_session_get_id(lua_State *state); -int lua_session_set_id(lua_State *state); -int lua_session_get_type(lua_State *state); -int lua_session_set_type(lua_State *state); - /* 需要注册至状态机中的函数定义在链表中, 会依次完成注册 */ struct lua_binding_function lua_bind_functions[] = { {lua_plugin_manage_regist, "register", "plugin_manage"}, @@ -42,6 +39,15 @@ struct lua_binding_function lua_bind_functions[] = { {lua_session_set_id, "setid", "session"}, {lua_session_get_type, "gettype", "session"}, {lua_session_set_type, "settype", "session"}, + {lua_mq_create_topic, "createtopic", "message"}, + {lua_mq_get_topic_id, "gettopicid", "message"}, + {lua_mq_update_topic, "updatetopic", "message"}, + {lua_mq_destory_topic, "destorytopic", "message"}, + {lua_mq_subscribe_topic, "subscribetopic", "message"}, + {lua_mq_topic_is_active, "topicisactive", "message"}, + {lua_mq_publish_message, "publishmessage", "message"}, + {lua_mq_ignore_message, "ignoremessage", "message"}, + {lua_mq_unignore_message, "unignoremessage", "message"}, {NULL, NULL, NULL}, }; @@ -464,211 +470,3 @@ int lua_cbinding_datas(lua_State *state) } return failed_count; } - -/* ***** ***** ***** ***** ***** ***** */ -int lua_plugin_manage_regist(lua_State *state) -{ - /* 参数个数检查 */ - if (lua_gettop(state) != 4) - { - lua_settop(state, 0); - return 0; - } - - /* 参数类型检查 */ - if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION || - lua_type(state, -3) != LUA_TFUNCTION || lua_type(state, -4) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - - /* 取出处理第四个参数 */ - lua_getfield(state, -1, LUA_PLUGIN_ENV_DEFAULT_KEY); /* stack 4, table中取出对应结构的指针 */ - if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - struct lua_model *plugin_env = (struct lua_model *)lua_topointer(state, -1); - lua_pop(state, 2); - // debug_lua_state_stack(state, 0, "here"); - // printf("env pointer is %p\n", plugin_env); - - /* 取出处理第三个参数 */ - int ctx_free_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 3 */ - if (ctx_free_id == LUA_REFNIL) - { - lua_settop(state, 0); - return 0; - } - - /* 取出处理第二个参数 */ - int ctx_new_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 2 */ - if (ctx_new_id == LUA_REFNIL) - { - lua_settop(state, 0); - return 0; - } - - /* 取出处理第一个参数 */ - struct stellar *st = (struct stellar *)lua_topointer(state, -1); /* stack 1 */ - if (!st) - { - lua_settop(state, 0); - return 0; - } - lua_pop(state, 1); - - /* 在stellar中注册, 获取注册id */ - int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)plugin_env); -#ifdef LUAPLUGIN_BASIC_UNITTEST - LOGDEBUG("now regist new plugin, plugin id is %d, %d, %d\n", plugin_id, ctx_new_id, ctx_free_id); -#endif - - /* TODO: 如果运行完全符合预期的话, 理论上仅有thread 0在此处需要插入新的插件, 且不应该有错误 - * 对于其他线程这里应该直接检查ref id是否一致即可, 按理说不应该再插入新插件 - * 后续可以修改为根据线程号执行不同的处理流程 - */ - /* 如果在其他线程中已经完成过注册 */ - struct lua_plugin *search_plugin = NULL; - while ((search_plugin = utarray_next(plugin_env->plugin_array, search_plugin))) - { - if (search_plugin->plugin_id == plugin_id) - { - /* 初始化过程中已经进行过加载 */ - if (search_plugin->ctx_new_ref != ctx_new_id || search_plugin->ctx_free_ref != ctx_free_id) - { - LOGERROR("regist plugin, same id with different function ref"); - LOGERROR("plugin id %d, registed %d, %d, new ref %d, %d", plugin_id, - search_plugin->ctx_new_ref, search_plugin->ctx_free_ref, - ctx_new_id, ctx_free_id); - lua_settop(state, 0); - return 0; - } - lua_settop(state, 0); - lua_pushinteger(state, plugin_id); - return 1; - } - } - - /* 将注册完成的新插件插入到队列中 */ - struct lua_plugin new_plugin; - memset(&new_plugin, 0, sizeof(new_plugin)); - new_plugin.plugin_id = plugin_id; - new_plugin.ctx_new_ref = ctx_new_id; - new_plugin.ctx_free_ref = ctx_free_id; - utarray_push_back(plugin_env->plugin_array, &new_plugin); - plugin_env->plugin_count += 1; - - lua_settop(state, 0); - lua_pushinteger(state, plugin_id); - - return 1; -} - -int lua_session_get_id(lua_State *state) -{ - /* 参数个数检查 */ - if (lua_gettop(state) != 1) - { - lua_settop(state, 0); - return 0; - } - - /* 参数类型检查 */ - if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - - struct session *sess = (struct session *)lua_topointer(state, -1); - if (!sess) - { - lua_settop(state, 0); - return 0; - } - lua_pop(state, 1); - - lua_pushinteger(state, session_get_id(sess)); - return 1; -} - -int lua_session_set_id(lua_State *state) -{ - /* 参数个数检查 */ - if (lua_gettop(state) != 2) - { - lua_settop(state, 0); - return 0; - } - - /* 参数类型检查 */ - if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - - int setid = lua_tointeger(state, -1); - lua_pop(state, 1); - struct session *sess = (struct session *)lua_topointer(state, -1); - lua_pop(state, 1); - - session_set_id(sess, setid); - return 0; -} - -int lua_session_get_type(lua_State *state) -{ - /* 参数个数检查 */ - if (lua_gettop(state) != 1) - { - lua_settop(state, 0); - return 0; - } - - /* 参数类型检查 */ - if (lua_type(state, -1) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - - struct session *sess = (struct session *)lua_topointer(state, -1); - if (!sess) - { - lua_settop(state, 0); - return 0; - } - lua_pop(state, 1); - - lua_pushinteger(state, session_get_type(sess)); - return 1; -} - -int lua_session_set_type(lua_State *state) -{ - /* 参数个数检查 */ - if (lua_gettop(state) != 2) - { - lua_settop(state, 0); - return 0; - } - - /* 参数类型检查 */ - if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA) - { - lua_settop(state, 0); - return 0; - } - - int settype = lua_tointeger(state, -1); - lua_pop(state, 1); - struct session *sess = (struct session *)lua_topointer(state, -1); - lua_pop(state, 1); - - session_set_id(sess, settype); - return 0; -} \ No newline at end of file diff --git a/src/lua_plugin_cfunc.c b/src/lua_plugin_cfunc.c index 2ea5fb7..9fbb646 100644 --- a/src/lua_plugin_cfunc.c +++ b/src/lua_plugin_cfunc.c @@ -12,6 +12,14 @@ * 1. 实现函数 * void *lpm_ctx_new_func; * void lpm_ctx_free_func; + * + * 08-13 + * 1. 由于context结构体修改, 部分函数调用及处理逻辑需要同步修改 + * + * 08-15 + * 1. 实现函数 + * void lpm_message_free_func + * void lpm_on_session_msg_func ************************************************************************/ #include "lua_plugin_manage_internal.h" @@ -48,9 +56,9 @@ void *lpm_ctx_new_func( /* 获取当前的线程id并找到该线程对应的state */ int thread_id = session_get_threadid(sess); - if ( thread_id > global_schema->state_count ) + if (thread_id > global_schema->state_count) return NULL; - lua_State * state = global_schema->thread_state[thread_id - 1]; + lua_State *state = global_schema->thread_state[thread_id]; struct lua_context *new_context = lua_context_new(state); if (__glibc_unlikely(!new_context)) /* 创建新的context失败 */ @@ -67,7 +75,7 @@ void *lpm_ctx_new_func( if (lua_chunk_execute(state, plugin->ctx_new_ref, 3, param, 0, NULL)) { /* 脚本执行失败 */ - free(new_context); + lua_context_free(state, new_context); return NULL; } @@ -90,7 +98,6 @@ void lpm_ctx_free_func( { if (__glibc_unlikely(!sess || !sess_ctx || !plugin_env)) return; - struct lua_context *context = (struct lua_context *)sess_ctx; struct lua_model *env = (struct lua_model *)plugin_env; /* 获取插件ID并找到该插件 */ @@ -107,20 +114,138 @@ void lpm_ctx_free_func( return; int thread_id = session_get_threadid(sess); - if ( thread_id > global_schema->state_count ) + if (thread_id > global_schema->state_count) return; - lua_State * state = global_schema->thread_state[thread_id]; + lua_State *state = global_schema->thread_state[thread_id]; struct lua_cdata param[3] = {0}; param[0].cdata_type = DATATYPE_POINTER; param[0].cdata_pointer = sess; param[1].cdata_type = DATATYPE_CONTEXT; - param[1].cdata_context = context; + param[1].cdata_context = (struct lua_context *)sess_ctx; param[2].cdata_type = DATATYPE_TABLE; param[2].cdata_table = env->private_env_ref; lua_chunk_execute(state, plugin->ctx_free_ref, 3, param, 0, NULL); - lua_context_free(state, context); + lua_context_free(state, (struct lua_context *)sess_ctx); return; } + +/* + * Function: lpm_message_free_func + * Input: | struct session * | sess | 释放message对应的session + * | void * | msg | 需要释放的message消息 + * | void * | msg_free_arg | 释放函数需要存储的私有数据 + * Output: + * Return: + * Description: 与C插件保持一致的session_msg_free_cb_func, 作为lua插件实现对应功能的通用函数 + */ +void lpm_message_free_func( + struct session *sess, + void *msg, + void *msg_free_arg) +{ + if (__glibc_unlikely(!sess || !msg || !msg_free_arg)) + return; + struct lua_message_mq *plugin_mq = (struct lua_message_mq *)msg_free_arg; + + int thread_id = session_get_threadid(sess); + if (thread_id > global_schema->state_count) + return; + lua_State *state = global_schema->thread_state[thread_id]; + + struct lua_cdata param[3] = {0}; + param[0].cdata_type = DATATYPE_POINTER; + param[0].cdata_pointer = sess; + param[1].cdata_type = DATATYPE_CONTEXT; + param[1].cdata_context = (struct lua_context *)msg; + param[2].cdata_type = DATATYPE_TABLE; + param[2].cdata_table = plugin_mq->mq_private_ref; + + lua_chunk_execute(state, plugin_mq->freemessage_ref, 3, param, 0, NULL); + lua_context_free(state, (struct lua_context *)msg); + + return; +} + +/* + * Function: lpm_on_session_msg_func + * Input: | struct session * | sess | 会话信息 + * | int | topic_id | 处理的message topic id + * | const void * | msg | 需要处理的消息信息 + * | void * | sess_ctx | 会话中的私有数据 + * | void * | plugin_env | 插件运行环境 + * Output: + * Return: + * Description: 与C插件管理器保持一致的ctx_free_func + */ +void lpm_on_session_msg_func( + struct session *sess, + int topic_id, + const void *msg, + void *sess_ctx, + void *plugin_env) +{ + if (__glibc_unlikely(!sess || !sess || !plugin_env)) + return; + + struct lua_context *sess_context = (struct lua_context *)sess_ctx; + struct lua_model *env = (struct lua_model *)plugin_env; + + /* 获取插件ID并找到该插件 */ + int plugin_id = session_get_pluginid(sess); + // int plugin_id = 1; + struct lua_plugin *plugin = NULL; + while ((plugin = utarray_next(env->plugin_array, plugin))) + { + if (plugin->plugin_id == plugin_id) + break; + } + if (!plugin || plugin->plugin_id != plugin_id || !plugin->sub_topic_array) + /* 未找到该插件, 或者在该插件中未发现注册的topic_id */ + return; + struct lua_plugin_mq *mq = NULL; + while ((mq = utarray_next(plugin->sub_topic_array, mq))) + { + if (mq->topic_id == topic_id) + break; + } + if (!mq || mq->topic_id != topic_id) + /* 未找到对应的消息处理函数 */ + return; + + /* 判断该消息是由C端插件产生的还是由Lua插件产生的 */ + struct lua_message_mq *message_mq = search_message_mq_by_id(topic_id); + + int thread_id = session_get_threadid(sess); + if (thread_id > global_schema->state_count) + return; + lua_State *state = global_schema->thread_state[thread_id]; + + struct lua_cdata params[5] = {0}; + params[0].cdata_type = DATATYPE_POINTER; + params[0].cdata_pointer = sess; + params[1].cdata_type = DATATYPE_INT; + params[1].cdata_int = topic_id; + if (!message_mq) + { + /* C端产生的直接使用指针 */ + params[2].cdata_type = DATATYPE_POINTER; + params[2].cdata_pointer = (void *)msg; + } + else + { + /* lua端产生的为一个context结构 */ + params[2].cdata_type = DATATYPE_CONTEXT; + params[2].cdata_context = (struct lua_context *)msg; + } + params[3].cdata_type = DATATYPE_CONTEXT; + params[3].cdata_context = sess_context; + params[4].cdata_type = DATATYPE_TABLE; + params[4].cdata_table = env->private_env_ref; + + lua_chunk_execute(state, mq->onmessage_ref, 5, params, 0, NULL); + + return; +} \ No newline at end of file diff --git a/src/lua_plugin_manage.c b/src/lua_plugin_manage.c index d983a20..657ad33 100644 --- a/src/lua_plugin_manage.c +++ b/src/lua_plugin_manage.c @@ -29,6 +29,12 @@ * 08-06 * 1. 实现函数 * int script_execute; + * + * 08-13 + * 1. 修改部分创建列表使用的结构 + * 2. 实现函数 + * struct lua_plugin * search_plugin_by_id + * struct lua_message_mq * search_message_mq_by_id ************************************************************************/ #include "lua_plugin_manage_internal.h" @@ -39,6 +45,68 @@ struct lua_plugin_manage_schema * global_schema = NULL; +#if 0 +void lua_message_mq_destory(void * elt) +{ + if (!elt) return; + struct lua_message_mq * mq = (struct lua_message_mq *)elt; + if ( mq->topic_name ) + free(mq->topic_name); + return; +} +#endif +static UT_icd lua_message_mq_icd = {sizeof(struct lua_message_mq), NULL, NULL, NULL}; + +void lua_plugin_destory(void *elt) +{ + if ( !elt ) return; + struct lua_plugin * plugin = (struct lua_plugin *)elt; + if ( plugin->sub_topic_array ) + utarray_free(plugin->sub_topic_array); + return; +} +static UT_icd lua_plugin_icd = {sizeof(struct lua_plugin), NULL, NULL, lua_plugin_destory}; + +/* + * Function: search_plugin_by_id + * Input: | int | plugin_id | 需要查找plugin数据的id + * Output: + * Return: 查找得到的plugin结构 + * Description: 在global schema中根据plugin_id查找一个plugin + */ +struct lua_plugin * search_plugin_by_id(int plugin_id) +{ + for ( int i = 0; i < global_schema->model_count; ++i) { + struct lua_model * model = &global_schema->model[i]; + + struct lua_plugin * plugin = NULL; + while ( (plugin = utarray_next(model->plugin_array, plugin)) ) { + if ( plugin->plugin_id == plugin_id ) + return plugin; + else if (plugin->plugin_id > plugin_id) + return NULL; + } + } + return NULL; +} + +/* + * Function: search_message_mq_by_id + * Input: | int | topic_id | 需要查找的topic_id + * Output: + * Return: 查找得到的message_mq结构 + * Description: 在global schema中根据topic_id查找一个message_mq + */ +struct lua_message_mq * search_message_mq_by_id(int topic_id) +{ + struct lua_message_mq * mq = NULL; + while ((mq = utarray_next(global_schema->message_mq_array, mq))) { + if ( mq->topic_id == topic_id ) + return mq; + } + return NULL; +} + /* * Function: thread_state_init * Input: | int | thread_id | 创建状态机的线程号 @@ -166,8 +234,6 @@ int thread_state_load_specific( return SUCCESS; } -static UT_icd lua_plugin_icd = {sizeof(struct lua_plugin), NULL, NULL, NULL}; - /* * Function: thread_state_call_load * Input: | lua_State * | state | 进行模块加载的状态机 @@ -313,6 +379,11 @@ struct lua_plugin_manage_schema *lua_plugin_manage_init( } } } + + /* 可能运行过程中创建新的topic, 此处进行初始化 */ + new_schema->mq_count = 0; + utarray_new(new_schema->message_mq_array, &lua_message_mq_icd); + debug_lua_plugin_manage_schema(new_schema); global_schema = new_schema; @@ -355,6 +426,9 @@ void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr) } free(lua_plug_mgr->model); + if ( lua_plug_mgr->message_mq_array ) + utarray_free(lua_plug_mgr->message_mq_array); + free(lua_plug_mgr); return; } diff --git a/src/lua_plugin_manage_internal.h b/src/lua_plugin_manage_internal.h index fdb763b..f945bff 100644 --- a/src/lua_plugin_manage_internal.h +++ b/src/lua_plugin_manage_internal.h @@ -74,6 +74,22 @@ * 整体重构状态机相关结构定义, 拆分高频访问数据与低频访问数据 * 状态机中数据管理由树形结构修改为数组型结构 * 经过多次验证, 相同状态机在执行相同操作后返回结果及中间产物一致, 合并一些冗余数据 + * 1. 声明并定义结构 + * struct lua_plugin_statistics; + * struct lua_plugin; + * struct lua_model; + * struct lua_plugin_manage_schema; + * + * 08-14 + * 新增message相关结构定义 + * 1. 声明并定义结构 + * struct lua_plugin_mq; + * struct lua_message_mq; + * 2. 声明函数 + * void lpm_message_free_func + * void lpm_on_session_msg_func + * struct lua_plugin * search_plugin_by_id + * struct lua_message_mq * search_message_mq_by_id ************************************************************************/ #ifndef LUA_PLUGIN_MANAGE_INTERNAL_H #define LUA_PLUGIN_MANAGE_INTERNAL_H @@ -209,7 +225,7 @@ void lua_cdata_destory(struct lua_cdata *cdata); /* 上下文结构, 保存临时数据 */ struct lua_context { - lua_State *context_state; + // lua_State *context_state; int context_ref_id; }; @@ -224,7 +240,8 @@ void lua_context_free(lua_State *state, struct lua_context *context); /* 此部分为注册至C中的lua通用函数, 实现在lua_plugin_cfunc.c中 */ void *lpm_ctx_new_func(struct session *sess, void *plugin_env); void lpm_ctx_free_func(struct session *sess, void *sess_ctx, void *plugin_env); -// void lpm_on_session_msg_func(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_env); +void lpm_message_free_func(struct session *sess, void *msg, void *msg_free_arg); +void lpm_on_session_msg_func(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_env); /* ***** ***** ***** ***** ***** ***** */ /* lua代码块相关操作, 实现在lua_plugin_chunk.c中 */ @@ -233,7 +250,6 @@ int lua_chunk_execute(lua_State *state, int ref_id, int pcount, struct lua_cdata /* ***** ***** ***** ***** ***** ***** */ /* 状态机相关的一些数据结构及操作, 实现在lua_plugin_manage.c中 */ - extern struct lua_plugin_manage_schema *global_schema; /* TODO:统计插件的运行情况, 暂时没想好怎么用 */ @@ -250,6 +266,17 @@ struct lua_plugin_statistics int free_failed_count; }; +#define LUA_MQ_ENV_DEFAULT_KEY "__mqenv_pointer" +#define LUA_MQ_TOPIC_ID_KEY "topic_id" +/* 保存lua插件注册的消息队列信息 */ +struct lua_plugin_mq +{ + /* 消息队列ID, 消息队列可能是订阅得到或者创建得到, 此处保存对应topic信息 */ + int topic_id; + /* 如果是订阅的topic必须包含处理函数 */ + int onmessage_ref; +}; + /* 保存Lua插件信息 */ struct lua_plugin { @@ -259,11 +286,13 @@ struct lua_plugin int ctx_new_ref; /* context_free函数在状态机中的引用值 */ int ctx_free_ref; + UT_array * sub_topic_array; }; #define MODEL_MARK_INIT_DONE 0x0001 #define MODEL_MARK_LOAD_DONE 0x0002 +#define LUA_PLUGIN_ENV_DEFAULT_KEY "__penv_pointer" /* 加载的lua模块, 一个lua模块一般来说对应一个lua文件, 与C插件管理中的so相同 */ struct lua_model { @@ -281,6 +310,19 @@ struct lua_model unsigned short plugin_count; }; +/* 由lua创建的topic结构, 该结构保存在schema中 */ +struct lua_message_mq +{ + /* 消息队列ID, 消息队列可能是订阅得到或者创建得到, 此处保存对应topic信息 */ + int topic_id; + /* 如果是新创建的topic, 必须包含释放函数 */ + int freemessage_ref; + /* 创建一个消息处理私有数据, 对应session_mq中的msg_free_arg */ + int mq_private_ref; + // char * topic_name; +}; + +#define LUA_STATE_THREAD_ID_KEY "__thread_id" struct lua_plugin_manage_schema { struct stellar *st; @@ -292,10 +334,14 @@ struct lua_plugin_manage_schema /* */ int model_count; struct lua_model *model; + + int mq_count; + /* TODO: 创建的所有message topic id理论上应该是连续的, 可以考虑用hash数组, 寻址能更快 */ + UT_array * message_mq_array; }; -#define LUA_PLUGIN_ENV_DEFAULT_KEY "__penv_pointer" -#define LUA_STATE_THREAD_ID_KEY "__thread_id" +struct lua_plugin * search_plugin_by_id(int plugin_id); +struct lua_message_mq * search_message_mq_by_id(int topic_id); #ifdef LUAPLUGIN_BASIC_UNITTEST void debug_lua_state_stack(lua_State *state, int mod, const char *message);