【新增】增加message处理相关lua接口函数

This commit is contained in:
niubinghui
2024-08-15 17:39:28 +08:00
parent 88a34dec4d
commit 3b2baf2806
8 changed files with 1118 additions and 236 deletions

View File

@@ -14,6 +14,9 @@
* lua_plugin_manage_exit
* 2. 声明数据结构
* struct lua_plugin_manage_schema
*
* 08-09
* 1. 修改参数函数原型, 传入参数修改为已经加载的配置信息
************************************************************************/
#pragma once

View File

@@ -5,16 +5,18 @@ TARGET=libluaplugin.so
TEST_FLAG = -DLUAPLUGIN_BASIC_UNITTEST
SRC := lua_plugin_binding.c \
lua_plugin_data.c \
lua_plugin_cfunc.c \
SRC := lua_plugin_data.c \
lua_plugin_chunk.c \
lua_plugin_cfunc.c \
lua_binding_functions.c \
lua_plugin_binding.c \
lua_plugin_manage.c
OBJECTS := lua_plugin_binding.o \
lua_plugin_data.o \
lua_plugin_cfunc.o \
OBJECTS := lua_plugin_data.o \
lua_plugin_chunk.o \
lua_plugin_cfunc.o \
lua_binding_functions.o \
lua_plugin_binding.o \
lua_plugin_manage.o
INCLUDE = -I$(TOPDIR)/dependence/include -I$(TOPDIR)/include

782
src/lua_binding_functions.c Normal file
View File

@@ -0,0 +1,782 @@
/*************************************************************************
> File Name: lua_binding_functions.c
> Author:
> Created Time: 2024-08
> Encoding : UTF-8
************************************************************************/
/*************************************************************************
* 声明并定义所有需要在lua状态机中绑定的函数
* version
* [ v0.1 ]
* 08-14
* 1. 实现函数
* 新增插件注册函数
* int lua_plugin_manage_regist
* 新增会话相关函数
* int lua_session_get_id
* int lua_session_set_id
* int lua_session_get_type
* int lua_session_set_type
* 新增message相关函数
* int lua_mq_create_topic
* int lua_mq_get_topic_id
* int lua_mq_update_topic
* int lua_mq_destory_topic
* int lua_mq_subscribe_topic
* int lua_mq_topic_is_active
* int lua_mq_publish_message
* int lua_mq_ignore_message
* int lua_mq_unignore_message
************************************************************************/
#include "lua_plugin_manage_internal.h"
#include "session_mq.h"
/* ***** ***** ***** ***** ***** ***** */
int lua_plugin_manage_regist(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION ||
lua_type(state, -3) != LUA_TFUNCTION || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 取出处理第四个参数 */
lua_getfield(state, -1, LUA_PLUGIN_ENV_DEFAULT_KEY); /* stack 4, table中取出对应结构的指针 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct lua_model *plugin_env = (struct lua_model *)lua_topointer(state, -1);
lua_pop(state, 2);
// debug_lua_state_stack(state, 0, "here");
// printf("env pointer is %p\n", plugin_env);
/* 取出处理第三个参数 */
int ctx_free_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 3 */
if (ctx_free_id == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
/* 取出处理第二个参数 */
int ctx_new_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 2 */
if (ctx_new_id == LUA_REFNIL)
{
luaL_unref(state, LUA_REGISTRYINDEX, ctx_free_id);
lua_settop(state, 0);
return 0;
}
/* 取出处理第一个参数 */
struct stellar *st = (struct stellar *)lua_topointer(state, -1); /* stack 1 */
if (!st)
{
luaL_unref(state, LUA_REGISTRYINDEX, ctx_new_id);
luaL_unref(state, LUA_REGISTRYINDEX, ctx_free_id);
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
/* 在stellar中注册, 获取注册id */
int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)plugin_env);
#ifdef LUAPLUGIN_BASIC_UNITTEST
LOGDEBUG("now regist new plugin, plugin id is %d, %d, %d\n", plugin_id, ctx_new_id, ctx_free_id);
#endif
/* TODO: 如果运行完全符合预期的话, 理论上仅有thread 0在此处需要插入新的插件, 且不应该有错误
* 对于其他线程这里应该直接检查ref id是否一致即可, 按理说不应该再插入新插件
* 后续可以修改为根据线程号执行不同的处理流程
*/
/* 如果在其他线程中已经完成过注册 */
struct lua_plugin *search_plugin = NULL;
while ((search_plugin = utarray_next(plugin_env->plugin_array, search_plugin)))
{
if (search_plugin->plugin_id == plugin_id)
{
/* 初始化过程中已经进行过加载 */
if (search_plugin->ctx_new_ref != ctx_new_id || search_plugin->ctx_free_ref != ctx_free_id)
{
LOGERROR("regist plugin, same id with different function ref");
LOGERROR("plugin id %d, registed %d, %d, new ref %d, %d", plugin_id,
search_plugin->ctx_new_ref, search_plugin->ctx_free_ref,
ctx_new_id, ctx_free_id);
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
lua_pushinteger(state, plugin_id);
return 1;
}
}
/* 将注册完成的新插件插入到队列中 */
struct lua_plugin new_plugin;
memset(&new_plugin, 0, sizeof(new_plugin));
new_plugin.plugin_id = plugin_id;
new_plugin.ctx_new_ref = ctx_new_id;
new_plugin.ctx_free_ref = ctx_free_id;
utarray_push_back(plugin_env->plugin_array, &new_plugin);
plugin_env->plugin_count += 1;
lua_settop(state, 0);
lua_pushinteger(state, plugin_id);
return 1;
}
/* ***** ***** ***** ***** ***** ***** */
int lua_session_get_id(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 1)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
lua_pushinteger(state, session_get_id(sess));
return 1;
}
int lua_session_set_id(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
int setid = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session *sess = (struct session *)lua_topointer(state, -1);
lua_pop(state, 1);
session_set_id(sess, setid);
return 0;
}
int lua_session_get_type(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 1)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
lua_pushinteger(state, session_get_type(sess));
return 1;
}
int lua_session_set_type(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
int settype = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session *sess = (struct session *)lua_topointer(state, -1);
lua_pop(state, 1);
session_set_id(sess, settype);
return 0;
}
/* ***** ***** ***** ***** ***** ***** */
/*
* TODO: 未完整考虑线程安全问题, 例如
* 多个线程同时注册一个topic, 是否需要做处理等。
*/
static UT_icd lua_plugin_mq_icd = {sizeof(struct lua_plugin_mq), NULL, NULL, NULL};
int lua_mq_create_topic(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION ||
lua_type(state, -3) != LUA_TSTRING || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 创建对该table的引用, 防止该table被回收 */
int private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */
if (private_ref == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
/* 导出free message的调用函数 */
int free_ref = luaL_ref(state, LUA_REGISTRYINDEX);
if (free_ref == LUA_REFNIL)
{
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
lua_settop(state, 0);
return 0;
}
/* 出栈队列名称 */
char *name = strdup((char *)lua_tostring(state, -1));
lua_pop(state, 1);
/* 出栈传入的stellar */
struct stellar *st = (struct stellar *)lua_topointer(state, -1);
if (!st)
{
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
if (name)
free(name);
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
/* 插入新的元素 */
struct lua_message_mq new_mq;
memset(&new_mq, 0, sizeof(new_mq));
utarray_push_back(global_schema->message_mq_array, &new_mq);
/* 从队列尾部取出最后一个元素 */
struct lua_message_mq *mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1));
mq->freemessage_ref = free_ref;
mq->mq_private_ref = private_ref;
/* 调用stellar中mq的topic创建函数 */
/* BugFix: 仔细看了代码, 在stellar中没有对name做拷贝处理, 这里name不能释放 */
int topic_id = stellar_session_mq_create_topic(st, (const char *)name, lpm_message_free_func, mq);
/* 创建topic失败, 还原创建topic之前的状态 */
if (topic_id < 0)
{
utarray_pop_back(global_schema->message_mq_array);
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
if (name)
free(name);
return 0;
}
lua_rawgeti(state, LUA_REGISTRYINDEX, mq->mq_private_ref);
#if 0
/* 不检查了, 直接覆盖 */
/* 在传入的table中检查是否存在与mq_private_env同名的元素 */
lua_getfield(state, -1, LUA_MQ_ENV_DEFAULT_KEY); /* stack top nil */
if (lua_type(state, -1) != LUA_TNIL) {
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1); /* stack top table */
#endif
/* 在该table中加入新元素 */
lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */
lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */
mq->topic_id = topic_id;
lua_settop(state, 0);
lua_pushinteger(state, topic_id);
return 1;
}
int lua_mq_get_topic_id(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TSTRING || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 出栈队列名称 */
char *name = strdup(lua_tostring(state, -1));
lua_pop(state, 1);
/* 出栈传入的stellar */
struct stellar *st = (struct stellar *)lua_topointer(state, -1);
if (!st)
{
if (name)
free(name);
return 0;
}
lua_pop(state, 1);
int topic_id = stellar_session_mq_get_topic_id(st, (const char *)name);
if (name)
free(name);
lua_settop(state, 0);
lua_pushinteger(state, topic_id);
return 1;
}
int lua_mq_update_topic(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION ||
lua_type(state, -3) != LUA_TNUMBER || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 创建对该table的引用, 防止该table被回收 */
int private_ref = luaL_ref(state, LUA_REGISTRYINDEX); /* stack top function */
if (private_ref == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
/* 导出free message的调用函数 */
int free_ref = luaL_ref(state, LUA_REGISTRYINDEX);
if (free_ref == LUA_REFNIL)
{
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
lua_settop(state, 0);
return 0;
}
/* topic_id */
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
/* 出栈传入的stellar */
struct stellar *st = (struct stellar *)lua_topointer(state, -1);
if (!st)
{
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
struct lua_message_mq *mq = search_message_mq_by_id(topic_id);
if (!mq || mq->topic_id != topic_id)
{
/* 如果topic不是lua创建的, 需要加入管理 */
struct lua_message_mq new_mq;
memset(&new_mq, 0, sizeof(new_mq));
utarray_push_back(global_schema->message_mq_array, &new_mq);
mq = utarray_eltptr(global_schema->message_mq_array, (utarray_len(global_schema->message_mq_array) - 1));
if (stellar_session_mq_update_topic(st, topic_id, lpm_message_free_func, mq))
{
utarray_pop_back(global_schema->message_mq_array);
luaL_unref(state, LUA_REGISTRYINDEX, free_ref);
luaL_unref(state, LUA_REGISTRYINDEX, private_ref);
return 0;
}
mq->freemessage_ref = free_ref;
mq->mq_private_ref = private_ref;
mq->topic_id = topic_id;
lua_pushboolean(state, 1);
return 1;
}
else
{
/* 本身是由lua创建的 */
/* 更新private_ref */
lua_rawgeti(state, LUA_REGISTRYINDEX, private_ref);
lua_pushlightuserdata(state, (void *)mq); /* stack top new_mq */
lua_setfield(state, -2, LUA_MQ_ENV_DEFAULT_KEY); /* stack top table */
luaL_unref(state, LUA_REGISTRYINDEX, mq->mq_private_ref);
mq->mq_private_ref = private_ref;
/* 更新free function ref */
luaL_unref(state, LUA_REGISTRYINDEX, mq->freemessage_ref);
mq->freemessage_ref = free_ref;
}
lua_settop(state, 0);
lua_pushboolean(state, 1);
return 1;
}
int lua_mq_destory_topic(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* topic_id */
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
/* 出栈传入的stellar */
struct stellar *st = (struct stellar *)lua_topointer(state, -1);
if (!st)
{
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
/* 优先调用C函数进行卸载 */
if (stellar_session_mq_destroy_topic(st, topic_id) < 0)
{
return 0;
}
/* 不方便删除, 将id置为-1, 确保匹配不到 */
struct lua_message_mq *mq = search_message_mq_by_id(topic_id);
if (mq)
{
mq->topic_id = -1;
luaL_unref(state, LUA_REGISTRYINDEX, mq->mq_private_ref);
luaL_unref(state, LUA_REGISTRYINDEX, mq->freemessage_ref);
mq->freemessage_ref = 0;
mq->mq_private_ref = 0;
}
lua_pushboolean(state, 1);
return 1;
}
int lua_mq_subscribe_topic(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TFUNCTION ||
lua_type(state, -3) != LUA_TNUMBER || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 读取参数 */
int plugin_id = lua_tointeger(state, -1);
lua_pop(state, 1);
int on_message_ref = luaL_ref(state, -1);
if (on_message_ref == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct stellar *st = (struct stellar *)lua_topointer(state, -1);
if (!st)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
lua_settop(state, 0);
if (stellar_session_mq_subscribe(st, topic_id, lpm_on_session_msg_func, plugin_id))
{
/* 订阅失败, 返回false */
lua_pushboolean(state, 0);
}
else
{
struct lua_plugin *plugin = search_plugin_by_id(plugin_id);
if (plugin)
{
if (!plugin->sub_topic_array)
{
/* 该插件尚未注册任何topic, 注册第一个topic时将创建接收的topic列表 */
utarray_new(plugin->sub_topic_array, &lua_plugin_mq_icd);
}
else
{
/* 如果该插件中之前已经订阅过该消息, 更新message函数 */
struct lua_plugin_mq *mq = NULL;
while ((mq = utarray_next(plugin->sub_topic_array, mq)))
{
if (mq->topic_id == topic_id)
{
luaL_unref(state, LUA_REGISTRYINDEX, mq->onmessage_ref);
mq->onmessage_ref = on_message_ref;
lua_pushboolean(state, 1);
return 1;
}
}
}
struct lua_plugin_mq new_mq;
memset(&new_mq, 0, sizeof(new_mq));
new_mq.topic_id = topic_id;
new_mq.onmessage_ref = on_message_ref;
utarray_push_back(plugin->sub_topic_array, &new_mq);
}
/* 订阅成功, 返回true */
lua_pushboolean(state, 1);
}
return 1;
}
int lua_mq_publish_message(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 3)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TNUMBER ||
lua_type(state, -3) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 倒序依次获取参数 */
int mess_ref = luaL_ref(state, LUA_REGISTRYINDEX);
if (mess_ref == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
luaL_unref(state, LUA_REGISTRYINDEX, mess_ref);
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
/* 创建一段数据引用 */
struct lua_context *new_context = (struct lua_context *)calloc(1, sizeof(struct lua_context));
if (__glibc_unlikely(!new_context))
{
luaL_unref(state, LUA_REGISTRYINDEX, mess_ref);
return 0;
}
new_context->context_ref_id = mess_ref;
/* 调用C接口发布消息 */
if (session_mq_publish_message(sess, topic_id, new_context))
{
luaL_unref(state, LUA_REGISTRYINDEX, new_context->context_ref_id);
free(new_context);
lua_pushboolean(state, 0);
}
else
{
lua_pushboolean(state, 1);
}
return 1;
}
int lua_mq_ignore_message(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TNUMBER ||
lua_type(state, -3) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 倒序获取参数 */
int plugin_id = lua_tointeger(state, -1);
lua_pop(state, 1);
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
if (session_mq_ignore_message(sess, topic_id, plugin_id))
lua_pushboolean(state, 0);
else
lua_pushboolean(state, 1);
return 1;
}
int lua_mq_unignore_message(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TNUMBER ||
lua_type(state, -3) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 倒序获取参数 */
int plugin_id = lua_tointeger(state, -1);
lua_pop(state, 1);
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
if (session_mq_unignore_message(sess, topic_id, plugin_id))
lua_pushboolean(state, 0);
else
lua_pushboolean(state, 1);
return 1;
}
int lua_mq_topic_is_active(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 倒序获取参数 */
int topic_id = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session * sess = (struct session *)lua_topointer(state, -1);
if ( !sess ) {
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
/* 1 means active */
if (session_mq_topic_is_active(sess, topic_id) == 1)
lua_pushboolean(state, 1);
else
lua_pushboolean(state, 0);
return 1;
}

View File

@@ -0,0 +1,52 @@
/*************************************************************************
> File Name: lua_binding_functions.h
> Author:
> Created Time: 2024-08
> Encoding : UTF-8
************************************************************************/
/*************************************************************************
* 声明并定义所有需要在lua状态机中绑定的函数
* version
* [ v0.1 ]
* 08-14
* 1. 新增函数声明
* 新增插件注册函数
* int lua_plugin_manage_regist
* 新增会话相关函数
* int lua_session_get_id
* int lua_session_set_id
* int lua_session_get_type
* int lua_session_set_type
* 新增message相关函数
* int lua_mq_create_topic
* int lua_mq_get_topic_id
* int lua_mq_update_topic
* int lua_mq_destory_topic
* int lua_mq_subscribe_topic
* int lua_mq_topic_is_active
* int lua_mq_publish_message
* int lua_mq_ignore_message
* int lua_mq_unignore_message
************************************************************************/
#include "lua_plugin_manage_internal.h"
/* 需要注册至lua中供lua调用的所有函数原型 */
int lua_plugin_manage_regist(lua_State *state);
/* 与struct session结构相关的函数 */
int lua_session_get_id(lua_State *state);
int lua_session_set_id(lua_State *state);
int lua_session_get_type(lua_State *state);
int lua_session_set_type(lua_State *state);
/* 与stellar message mq相关的函数 */
int lua_mq_create_topic(lua_State * state);
int lua_mq_get_topic_id(lua_State * state);
int lua_mq_update_topic(lua_State * state);
int lua_mq_destory_topic(lua_State * state);
int lua_mq_subscribe_topic(lua_State * state);
int lua_mq_topic_is_active(lua_State * state);
int lua_mq_publish_message(lua_State * state);
int lua_mq_ignore_message(lua_State * state);
int lua_mq_unignore_message(lua_State * state);

View File

@@ -22,19 +22,16 @@
* 08-12
* 1. 修改函数lua_cbinding_function, 参数与lua_cbinding_data保持统一
* 2. 修改部分函数返回值, 使用枚举代替错误码返回值, 方便统一处理
*
* 08-14
* 1. 将所有待注册函数移动至新文件中, 不再此文件中实现
************************************************************************/
#include "lua_plugin_manage_internal.h"
#include "lua_binding_functions.h"
#include <stdlib.h>
#include <string.h>
/* 需要注册至lua中供lua调用的所有函数原型 */
int lua_plugin_manage_regist(lua_State *state);
int lua_session_get_id(lua_State *state);
int lua_session_set_id(lua_State *state);
int lua_session_get_type(lua_State *state);
int lua_session_set_type(lua_State *state);
/* 需要注册至状态机中的函数定义在链表中, 会依次完成注册 */
struct lua_binding_function lua_bind_functions[] = {
{lua_plugin_manage_regist, "register", "plugin_manage"},
@@ -42,6 +39,15 @@ struct lua_binding_function lua_bind_functions[] = {
{lua_session_set_id, "setid", "session"},
{lua_session_get_type, "gettype", "session"},
{lua_session_set_type, "settype", "session"},
{lua_mq_create_topic, "createtopic", "message"},
{lua_mq_get_topic_id, "gettopicid", "message"},
{lua_mq_update_topic, "updatetopic", "message"},
{lua_mq_destory_topic, "destorytopic", "message"},
{lua_mq_subscribe_topic, "subscribetopic", "message"},
{lua_mq_topic_is_active, "topicisactive", "message"},
{lua_mq_publish_message, "publishmessage", "message"},
{lua_mq_ignore_message, "ignoremessage", "message"},
{lua_mq_unignore_message, "unignoremessage", "message"},
{NULL, NULL, NULL},
};
@@ -464,211 +470,3 @@ int lua_cbinding_datas(lua_State *state)
}
return failed_count;
}
/* ***** ***** ***** ***** ***** ***** */
int lua_plugin_manage_regist(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 4)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TTABLE || lua_type(state, -2) != LUA_TFUNCTION ||
lua_type(state, -3) != LUA_TFUNCTION || lua_type(state, -4) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
/* 取出处理第四个参数 */
lua_getfield(state, -1, LUA_PLUGIN_ENV_DEFAULT_KEY); /* stack 4, table中取出对应结构的指针 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct lua_model *plugin_env = (struct lua_model *)lua_topointer(state, -1);
lua_pop(state, 2);
// debug_lua_state_stack(state, 0, "here");
// printf("env pointer is %p\n", plugin_env);
/* 取出处理第三个参数 */
int ctx_free_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 3 */
if (ctx_free_id == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
/* 取出处理第二个参数 */
int ctx_new_id = luaL_ref(state, LUA_REGISTRYINDEX); /* stack 2 */
if (ctx_new_id == LUA_REFNIL)
{
lua_settop(state, 0);
return 0;
}
/* 取出处理第一个参数 */
struct stellar *st = (struct stellar *)lua_topointer(state, -1); /* stack 1 */
if (!st)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
/* 在stellar中注册, 获取注册id */
int plugin_id = stellar_session_plugin_register(st, lpm_ctx_new_func, lpm_ctx_free_func, (void *)plugin_env);
#ifdef LUAPLUGIN_BASIC_UNITTEST
LOGDEBUG("now regist new plugin, plugin id is %d, %d, %d\n", plugin_id, ctx_new_id, ctx_free_id);
#endif
/* TODO: 如果运行完全符合预期的话, 理论上仅有thread 0在此处需要插入新的插件, 且不应该有错误
* 对于其他线程这里应该直接检查ref id是否一致即可, 按理说不应该再插入新插件
* 后续可以修改为根据线程号执行不同的处理流程
*/
/* 如果在其他线程中已经完成过注册 */
struct lua_plugin *search_plugin = NULL;
while ((search_plugin = utarray_next(plugin_env->plugin_array, search_plugin)))
{
if (search_plugin->plugin_id == plugin_id)
{
/* 初始化过程中已经进行过加载 */
if (search_plugin->ctx_new_ref != ctx_new_id || search_plugin->ctx_free_ref != ctx_free_id)
{
LOGERROR("regist plugin, same id with different function ref");
LOGERROR("plugin id %d, registed %d, %d, new ref %d, %d", plugin_id,
search_plugin->ctx_new_ref, search_plugin->ctx_free_ref,
ctx_new_id, ctx_free_id);
lua_settop(state, 0);
return 0;
}
lua_settop(state, 0);
lua_pushinteger(state, plugin_id);
return 1;
}
}
/* 将注册完成的新插件插入到队列中 */
struct lua_plugin new_plugin;
memset(&new_plugin, 0, sizeof(new_plugin));
new_plugin.plugin_id = plugin_id;
new_plugin.ctx_new_ref = ctx_new_id;
new_plugin.ctx_free_ref = ctx_free_id;
utarray_push_back(plugin_env->plugin_array, &new_plugin);
plugin_env->plugin_count += 1;
lua_settop(state, 0);
lua_pushinteger(state, plugin_id);
return 1;
}
int lua_session_get_id(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 1)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
lua_pushinteger(state, session_get_id(sess));
return 1;
}
int lua_session_set_id(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
int setid = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session *sess = (struct session *)lua_topointer(state, -1);
lua_pop(state, 1);
session_set_id(sess, setid);
return 0;
}
int lua_session_get_type(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 1)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
struct session *sess = (struct session *)lua_topointer(state, -1);
if (!sess)
{
lua_settop(state, 0);
return 0;
}
lua_pop(state, 1);
lua_pushinteger(state, session_get_type(sess));
return 1;
}
int lua_session_set_type(lua_State *state)
{
/* 参数个数检查 */
if (lua_gettop(state) != 2)
{
lua_settop(state, 0);
return 0;
}
/* 参数类型检查 */
if (lua_type(state, -1) != LUA_TNUMBER || lua_type(state, -2) != LUA_TLIGHTUSERDATA)
{
lua_settop(state, 0);
return 0;
}
int settype = lua_tointeger(state, -1);
lua_pop(state, 1);
struct session *sess = (struct session *)lua_topointer(state, -1);
lua_pop(state, 1);
session_set_id(sess, settype);
return 0;
}

View File

@@ -12,6 +12,14 @@
* 1. 实现函数
* void *lpm_ctx_new_func;
* void lpm_ctx_free_func;
*
* 08-13
* 1. 由于context结构体修改, 部分函数调用及处理逻辑需要同步修改
*
* 08-15
* 1. 实现函数
* void lpm_message_free_func
* void lpm_on_session_msg_func
************************************************************************/
#include "lua_plugin_manage_internal.h"
@@ -48,9 +56,9 @@ void *lpm_ctx_new_func(
/* 获取当前的线程id并找到该线程对应的state */
int thread_id = session_get_threadid(sess);
if ( thread_id > global_schema->state_count )
if (thread_id > global_schema->state_count)
return NULL;
lua_State * state = global_schema->thread_state[thread_id - 1];
lua_State *state = global_schema->thread_state[thread_id];
struct lua_context *new_context = lua_context_new(state);
if (__glibc_unlikely(!new_context))
/* 创建新的context失败 */
@@ -67,7 +75,7 @@ void *lpm_ctx_new_func(
if (lua_chunk_execute(state, plugin->ctx_new_ref, 3, param, 0, NULL))
{
/* 脚本执行失败 */
free(new_context);
lua_context_free(state, new_context);
return NULL;
}
@@ -90,7 +98,6 @@ void lpm_ctx_free_func(
{
if (__glibc_unlikely(!sess || !sess_ctx || !plugin_env))
return;
struct lua_context *context = (struct lua_context *)sess_ctx;
struct lua_model *env = (struct lua_model *)plugin_env;
/* 获取插件ID并找到该插件 */
@@ -107,20 +114,138 @@ void lpm_ctx_free_func(
return;
int thread_id = session_get_threadid(sess);
if ( thread_id > global_schema->state_count )
if (thread_id > global_schema->state_count)
return;
lua_State * state = global_schema->thread_state[thread_id];
lua_State *state = global_schema->thread_state[thread_id];
struct lua_cdata param[3] = {0};
param[0].cdata_type = DATATYPE_POINTER;
param[0].cdata_pointer = sess;
param[1].cdata_type = DATATYPE_CONTEXT;
param[1].cdata_context = context;
param[1].cdata_context = (struct lua_context *)sess_ctx;
param[2].cdata_type = DATATYPE_TABLE;
param[2].cdata_table = env->private_env_ref;
lua_chunk_execute(state, plugin->ctx_free_ref, 3, param, 0, NULL);
lua_context_free(state, context);
lua_context_free(state, (struct lua_context *)sess_ctx);
return;
}
/*
* Function: lpm_message_free_func
* Input: | struct session * | sess | 释放message对应的session
* | void * | msg | 需要释放的message消息
* | void * | msg_free_arg | 释放函数需要存储的私有数据
* Output:
* Return:
* Description: 与C插件保持一致的session_msg_free_cb_func, 作为lua插件实现对应功能的通用函数
*/
void lpm_message_free_func(
struct session *sess,
void *msg,
void *msg_free_arg)
{
if (__glibc_unlikely(!sess || !msg || !msg_free_arg))
return;
struct lua_message_mq *plugin_mq = (struct lua_message_mq *)msg_free_arg;
int thread_id = session_get_threadid(sess);
if (thread_id > global_schema->state_count)
return;
lua_State *state = global_schema->thread_state[thread_id];
struct lua_cdata param[3] = {0};
param[0].cdata_type = DATATYPE_POINTER;
param[0].cdata_pointer = sess;
param[1].cdata_type = DATATYPE_CONTEXT;
param[1].cdata_context = (struct lua_context *)msg;
param[2].cdata_type = DATATYPE_TABLE;
param[2].cdata_table = plugin_mq->mq_private_ref;
lua_chunk_execute(state, plugin_mq->freemessage_ref, 3, param, 0, NULL);
lua_context_free(state, (struct lua_context *)msg);
return;
}
/*
* Function: lpm_on_session_msg_func
* Input: | struct session * | sess | 会话信息
* | int | topic_id | 处理的message topic id
* | const void * | msg | 需要处理的消息信息
* | void * | sess_ctx | 会话中的私有数据
* | void * | plugin_env | 插件运行环境
* Output:
* Return:
* Description: 与C插件管理器保持一致的ctx_free_func
*/
void lpm_on_session_msg_func(
struct session *sess,
int topic_id,
const void *msg,
void *sess_ctx,
void *plugin_env)
{
if (__glibc_unlikely(!sess || !sess || !plugin_env))
return;
struct lua_context *sess_context = (struct lua_context *)sess_ctx;
struct lua_model *env = (struct lua_model *)plugin_env;
/* 获取插件ID并找到该插件 */
int plugin_id = session_get_pluginid(sess);
// int plugin_id = 1;
struct lua_plugin *plugin = NULL;
while ((plugin = utarray_next(env->plugin_array, plugin)))
{
if (plugin->plugin_id == plugin_id)
break;
}
if (!plugin || plugin->plugin_id != plugin_id || !plugin->sub_topic_array)
/* 未找到该插件, 或者在该插件中未发现注册的topic_id */
return;
struct lua_plugin_mq *mq = NULL;
while ((mq = utarray_next(plugin->sub_topic_array, mq)))
{
if (mq->topic_id == topic_id)
break;
}
if (!mq || mq->topic_id != topic_id)
/* 未找到对应的消息处理函数 */
return;
/* 判断该消息是由C端插件产生的还是由Lua插件产生的 */
struct lua_message_mq *message_mq = search_message_mq_by_id(topic_id);
int thread_id = session_get_threadid(sess);
if (thread_id > global_schema->state_count)
return;
lua_State *state = global_schema->thread_state[thread_id];
struct lua_cdata params[5] = {0};
params[0].cdata_type = DATATYPE_POINTER;
params[0].cdata_pointer = sess;
params[1].cdata_type = DATATYPE_INT;
params[1].cdata_int = topic_id;
if (!message_mq)
{
/* C端产生的直接使用指针 */
params[2].cdata_type = DATATYPE_POINTER;
params[2].cdata_pointer = (void *)msg;
}
else
{
/* lua端产生的为一个context结构 */
params[2].cdata_type = DATATYPE_CONTEXT;
params[2].cdata_context = (struct lua_context *)msg;
}
params[3].cdata_type = DATATYPE_CONTEXT;
params[3].cdata_context = sess_context;
params[4].cdata_type = DATATYPE_TABLE;
params[4].cdata_table = env->private_env_ref;
lua_chunk_execute(state, mq->onmessage_ref, 5, params, 0, NULL);
return;
}

View File

@@ -29,6 +29,12 @@
* 08-06
* 1. 实现函数
* int script_execute;
*
* 08-13
* 1. 修改部分创建列表使用的结构
* 2. 实现函数
* struct lua_plugin * search_plugin_by_id
* struct lua_message_mq * search_message_mq_by_id
************************************************************************/
#include "lua_plugin_manage_internal.h"
@@ -39,6 +45,68 @@
struct lua_plugin_manage_schema * global_schema = NULL;
#if 0
void lua_message_mq_destory(void * elt)
{
if (!elt) return;
struct lua_message_mq * mq = (struct lua_message_mq *)elt;
if ( mq->topic_name )
free(mq->topic_name);
return;
}
#endif
static UT_icd lua_message_mq_icd = {sizeof(struct lua_message_mq), NULL, NULL, NULL};
void lua_plugin_destory(void *elt)
{
if ( !elt ) return;
struct lua_plugin * plugin = (struct lua_plugin *)elt;
if ( plugin->sub_topic_array )
utarray_free(plugin->sub_topic_array);
return;
}
static UT_icd lua_plugin_icd = {sizeof(struct lua_plugin), NULL, NULL, lua_plugin_destory};
/*
* Function: search_plugin_by_id
* Input: | int | plugin_id | 需要查找plugin数据的id
* Output:
* Return: 查找得到的plugin结构
* Description: 在global schema中根据plugin_id查找一个plugin
*/
struct lua_plugin * search_plugin_by_id(int plugin_id)
{
for ( int i = 0; i < global_schema->model_count; ++i) {
struct lua_model * model = &global_schema->model[i];
struct lua_plugin * plugin = NULL;
while ( (plugin = utarray_next(model->plugin_array, plugin)) ) {
if ( plugin->plugin_id == plugin_id )
return plugin;
else if (plugin->plugin_id > plugin_id)
return NULL;
}
}
return NULL;
}
/*
* Function: search_message_mq_by_id
* Input: | int | topic_id | 需要查找的topic_id
* Output:
* Return: 查找得到的message_mq结构
* Description: 在global schema中根据topic_id查找一个message_mq
*/
struct lua_message_mq * search_message_mq_by_id(int topic_id)
{
struct lua_message_mq * mq = NULL;
while ((mq = utarray_next(global_schema->message_mq_array, mq))) {
if ( mq->topic_id == topic_id )
return mq;
}
return NULL;
}
/*
* Function: thread_state_init
* Input: | int | thread_id | 创建状态机的线程号
@@ -166,8 +234,6 @@ int thread_state_load_specific(
return SUCCESS;
}
static UT_icd lua_plugin_icd = {sizeof(struct lua_plugin), NULL, NULL, NULL};
/*
* Function: thread_state_call_load
* Input: | lua_State * | state | 进行模块加载的状态机
@@ -313,6 +379,11 @@ struct lua_plugin_manage_schema *lua_plugin_manage_init(
}
}
}
/* 可能运行过程中创建新的topic, 此处进行初始化 */
new_schema->mq_count = 0;
utarray_new(new_schema->message_mq_array, &lua_message_mq_icd);
debug_lua_plugin_manage_schema(new_schema);
global_schema = new_schema;
@@ -355,6 +426,9 @@ void lua_plugin_manage_exit(struct lua_plugin_manage_schema *lua_plug_mgr)
}
free(lua_plug_mgr->model);
if ( lua_plug_mgr->message_mq_array )
utarray_free(lua_plug_mgr->message_mq_array);
free(lua_plug_mgr);
return;
}

View File

@@ -74,6 +74,22 @@
* 整体重构状态机相关结构定义, 拆分高频访问数据与低频访问数据
* 状态机中数据管理由树形结构修改为数组型结构
* 经过多次验证, 相同状态机在执行相同操作后返回结果及中间产物一致, 合并一些冗余数据
* 1. 声明并定义结构
* struct lua_plugin_statistics;
* struct lua_plugin;
* struct lua_model;
* struct lua_plugin_manage_schema;
*
* 08-14
* 新增message相关结构定义
* 1. 声明并定义结构
* struct lua_plugin_mq;
* struct lua_message_mq;
* 2. 声明函数
* void lpm_message_free_func
* void lpm_on_session_msg_func
* struct lua_plugin * search_plugin_by_id
* struct lua_message_mq * search_message_mq_by_id
************************************************************************/
#ifndef LUA_PLUGIN_MANAGE_INTERNAL_H
#define LUA_PLUGIN_MANAGE_INTERNAL_H
@@ -209,7 +225,7 @@ void lua_cdata_destory(struct lua_cdata *cdata);
/* 上下文结构, 保存临时数据 */
struct lua_context
{
lua_State *context_state;
// lua_State *context_state;
int context_ref_id;
};
@@ -224,7 +240,8 @@ void lua_context_free(lua_State *state, struct lua_context *context);
/* 此部分为注册至C中的lua通用函数, 实现在lua_plugin_cfunc.c中 */
void *lpm_ctx_new_func(struct session *sess, void *plugin_env);
void lpm_ctx_free_func(struct session *sess, void *sess_ctx, void *plugin_env);
// void lpm_on_session_msg_func(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_env);
void lpm_message_free_func(struct session *sess, void *msg, void *msg_free_arg);
void lpm_on_session_msg_func(struct session *sess, int topic_id, const void *msg, void *sess_ctx, void *plugin_env);
/* ***** ***** ***** ***** ***** ***** */
/* lua代码块相关操作, 实现在lua_plugin_chunk.c中 */
@@ -233,7 +250,6 @@ int lua_chunk_execute(lua_State *state, int ref_id, int pcount, struct lua_cdata
/* ***** ***** ***** ***** ***** ***** */
/* 状态机相关的一些数据结构及操作, 实现在lua_plugin_manage.c中 */
extern struct lua_plugin_manage_schema *global_schema;
/* TODO:统计插件的运行情况, 暂时没想好怎么用 */
@@ -250,6 +266,17 @@ struct lua_plugin_statistics
int free_failed_count;
};
#define LUA_MQ_ENV_DEFAULT_KEY "__mqenv_pointer"
#define LUA_MQ_TOPIC_ID_KEY "topic_id"
/* 保存lua插件注册的消息队列信息 */
struct lua_plugin_mq
{
/* 消息队列ID, 消息队列可能是订阅得到或者创建得到, 此处保存对应topic信息 */
int topic_id;
/* 如果是订阅的topic必须包含处理函数 */
int onmessage_ref;
};
/* 保存Lua插件信息 */
struct lua_plugin
{
@@ -259,11 +286,13 @@ struct lua_plugin
int ctx_new_ref;
/* context_free函数在状态机中的引用值 */
int ctx_free_ref;
UT_array * sub_topic_array;
};
#define MODEL_MARK_INIT_DONE 0x0001
#define MODEL_MARK_LOAD_DONE 0x0002
#define LUA_PLUGIN_ENV_DEFAULT_KEY "__penv_pointer"
/* 加载的lua模块, 一个lua模块一般来说对应一个lua文件, 与C插件管理中的so相同 */
struct lua_model
{
@@ -281,6 +310,19 @@ struct lua_model
unsigned short plugin_count;
};
/* 由lua创建的topic结构, 该结构保存在schema中 */
struct lua_message_mq
{
/* 消息队列ID, 消息队列可能是订阅得到或者创建得到, 此处保存对应topic信息 */
int topic_id;
/* 如果是新创建的topic, 必须包含释放函数 */
int freemessage_ref;
/* 创建一个消息处理私有数据, 对应session_mq中的msg_free_arg */
int mq_private_ref;
// char * topic_name;
};
#define LUA_STATE_THREAD_ID_KEY "__thread_id"
struct lua_plugin_manage_schema
{
struct stellar *st;
@@ -292,10 +334,14 @@ struct lua_plugin_manage_schema
/* */
int model_count;
struct lua_model *model;
int mq_count;
/* TODO: 创建的所有message topic id理论上应该是连续的, 可以考虑用hash数组, 寻址能更快 */
UT_array * message_mq_array;
};
#define LUA_PLUGIN_ENV_DEFAULT_KEY "__penv_pointer"
#define LUA_STATE_THREAD_ID_KEY "__thread_id"
struct lua_plugin * search_plugin_by_id(int plugin_id);
struct lua_message_mq * search_message_mq_by_id(int topic_id);
#ifdef LUAPLUGIN_BASIC_UNITTEST
void debug_lua_state_stack(lua_State *state, int mod, const char *message);