stellar registers packet_manager as a module; session_manager registers polling callback
This commit is contained in:
@@ -1,2 +1 @@
|
|||||||
install(FILES stellar.toml DESTINATION conf COMPONENT PROGRAM)
|
install(FILES stellar.toml DESTINATION conf COMPONENT PROGRAM)
|
||||||
install(FILES log.toml DESTINATION conf COMPONENT PROGRAM)
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
[log]
|
|
||||||
output = "both" # stderr, file, both
|
|
||||||
file = "log/stellar.log"
|
|
||||||
level = "INFO" # TRACE, DEBUG, INFO, WARN, ERROR, FATAL
|
|
||||||
@@ -61,4 +61,24 @@
|
|||||||
|
|
||||||
[stat]
|
[stat]
|
||||||
merge_interval_ms = 500 # range: [0, 60000] (ms)
|
merge_interval_ms = 500 # range: [0, 60000] (ms)
|
||||||
output_interval_ms = 1000 # range: [0, 60000] (ms)
|
output_interval_ms = 1000 # range: [0, 60000] (ms)
|
||||||
|
|
||||||
|
[log]
|
||||||
|
output = "both" # stderr, file, both
|
||||||
|
file = "log/stellar.log"
|
||||||
|
level = "INFO" # TRACE, DEBUG, INFO, WARN, ERROR, FATAL
|
||||||
|
|
||||||
|
[[module]]
|
||||||
|
path = ""
|
||||||
|
init = "polling_manager_on_init"
|
||||||
|
exit = "polling_manager_on_exit"
|
||||||
|
|
||||||
|
[[module]]
|
||||||
|
path = ""
|
||||||
|
init = "packet_manager_on_init"
|
||||||
|
exit = "packet_manager_on_exit"
|
||||||
|
|
||||||
|
[[module]]
|
||||||
|
path = ""
|
||||||
|
init = "session_manager_on_init"
|
||||||
|
exit = "session_manager_on_exit"
|
||||||
@@ -8,6 +8,9 @@ extern "C"
|
|||||||
#include "stellar/mq.h"
|
#include "stellar/mq.h"
|
||||||
#include "stellar/log.h"
|
#include "stellar/log.h"
|
||||||
|
|
||||||
|
#define PACKET_MANAGER_MODULE_NAME "packet_manager_module"
|
||||||
|
#define SESSION_MANAGER_MODULE_NAME "session_manager_module"
|
||||||
|
|
||||||
struct stellar_module;
|
struct stellar_module;
|
||||||
struct stellar_module *stellar_module_new(const char *name, void *ctx);
|
struct stellar_module *stellar_module_new(const char *name, void *ctx);
|
||||||
void stellar_module_free(struct stellar_module *mod);
|
void stellar_module_free(struct stellar_module *mod);
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, c
|
|||||||
// only send user build packet, can't send packet which come from network
|
// only send user build packet, can't send packet which come from network
|
||||||
// void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
|
// void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
|
||||||
|
|
||||||
struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file);
|
struct stellar *stellar_new(const char *toml_file);
|
||||||
void stellar_run(struct stellar *st);
|
void stellar_run(struct stellar *st);
|
||||||
void stellar_free(struct stellar *st);
|
void stellar_free(struct stellar *st);
|
||||||
void stellar_loopbreak(struct stellar *st);
|
void stellar_loopbreak(struct stellar *st);
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ int main(int argc __attribute__((__unused__)), char **argv __attribute__((__unus
|
|||||||
signal(SIGTERM, signal_handler);
|
signal(SIGTERM, signal_handler);
|
||||||
signal(SIGHUP, signal_handler);
|
signal(SIGHUP, signal_handler);
|
||||||
|
|
||||||
st = stellar_new("./conf/stellar.toml", "./module/spec.toml", "./conf/log.toml");
|
st = stellar_new("./conf/stellar.toml");
|
||||||
if (st == NULL)
|
if (st == NULL)
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@@ -7,8 +7,8 @@
|
|||||||
#include "marsio_io.h"
|
#include "marsio_io.h"
|
||||||
#include "log_internal.h"
|
#include "log_internal.h"
|
||||||
|
|
||||||
#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet_io", format, ##__VA_ARGS__)
|
#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet io", format, ##__VA_ARGS__)
|
||||||
#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet_io", format, ##__VA_ARGS__)
|
#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet io", format, ##__VA_ARGS__)
|
||||||
|
|
||||||
struct packet_io
|
struct packet_io
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash
|
|||||||
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
|
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
|
||||||
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
|
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
|
||||||
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra)
|
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra)
|
||||||
target_link_libraries(packet_manager tuple logger dablooms mq exdata)
|
target_link_libraries(packet_manager tuple logger dablooms mq exdata module_manager)
|
||||||
|
|
||||||
add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
#include "utils.h"
|
|
||||||
#include "stellar/mq.h"
|
#include "stellar/mq.h"
|
||||||
|
#include "stellar/module_manager.h"
|
||||||
|
|
||||||
|
#include "utils.h"
|
||||||
#include "packet_internal.h"
|
#include "packet_internal.h"
|
||||||
#include "packet_manager_internal.h"
|
#include "packet_manager_internal.h"
|
||||||
|
|
||||||
@@ -272,11 +274,14 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
|
|||||||
{
|
{
|
||||||
if (pkt_mgr)
|
if (pkt_mgr)
|
||||||
{
|
{
|
||||||
for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++)
|
if (pkt_mgr->cfg)
|
||||||
{
|
{
|
||||||
if (pkt_mgr->runtime[i])
|
for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++)
|
||||||
{
|
{
|
||||||
packet_manager_runtime_free(pkt_mgr->runtime[i]);
|
if (pkt_mgr->runtime[i])
|
||||||
|
{
|
||||||
|
packet_manager_runtime_free(pkt_mgr->runtime[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -300,6 +305,9 @@ int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage s
|
|||||||
|
|
||||||
void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt)
|
void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt)
|
||||||
{
|
{
|
||||||
|
assert(pkt_mgr);
|
||||||
|
assert(thread_id < pkt_mgr->cfg->nr_worker_thread);
|
||||||
|
assert(mq_rt);
|
||||||
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
|
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
|
||||||
|
|
||||||
runtime->mq = mq_rt;
|
runtime->mq = mq_rt;
|
||||||
@@ -432,3 +440,46 @@ void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_i
|
|||||||
runtime->stat.queue[i].pkts_schedule);
|
runtime->stat.queue[i].pkts_schedule);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/******************************************************************************
|
||||||
|
* packet manager module
|
||||||
|
******************************************************************************/
|
||||||
|
|
||||||
|
struct stellar_module *packet_manager_on_init(struct stellar_module_manager *mod_mgr)
|
||||||
|
{
|
||||||
|
assert(mod_mgr);
|
||||||
|
struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr);
|
||||||
|
assert(mq_schema);
|
||||||
|
const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr);
|
||||||
|
assert(toml_file);
|
||||||
|
|
||||||
|
struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, toml_file);
|
||||||
|
if (pkt_mgr == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct stellar_module *pkt_mgr_mod = stellar_module_new(PACKET_MANAGER_MODULE_NAME, NULL);
|
||||||
|
if (pkt_mgr_mod == NULL)
|
||||||
|
{
|
||||||
|
PACKET_MANAGER_LOG_ERROR("failed to create packet_manager");
|
||||||
|
packet_manager_free(pkt_mgr);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
stellar_module_set_ctx(pkt_mgr_mod, pkt_mgr);
|
||||||
|
|
||||||
|
PACKET_MANAGER_LOG_INFO("packet_manager initialized");
|
||||||
|
return pkt_mgr_mod;
|
||||||
|
}
|
||||||
|
|
||||||
|
void packet_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), struct stellar_module *mod)
|
||||||
|
{
|
||||||
|
if (mod)
|
||||||
|
{
|
||||||
|
struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod);
|
||||||
|
|
||||||
|
packet_manager_free(pkt_mgr);
|
||||||
|
stellar_module_free(mod);
|
||||||
|
PACKET_MANAGER_LOG_INFO("packet_manager exited");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,6 +13,4 @@ target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
|
|||||||
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
|
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
|
||||||
target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata)
|
target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata)
|
||||||
|
|
||||||
add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
|
|
||||||
install(FILES conf/spec.toml DESTINATION module COMPONENT PROGRAM)
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
[[module]]
|
|
||||||
path = "./lib/libstellar.so"
|
|
||||||
init = "session_manager_module_on_init"
|
|
||||||
exit = "session_manager_module_on_exit"
|
|
||||||
@@ -5,6 +5,7 @@
|
|||||||
#include "stellar/packet_manager.h"
|
#include "stellar/packet_manager.h"
|
||||||
#include "stellar/session_manager.h"
|
#include "stellar/session_manager.h"
|
||||||
#include "stellar/module_manager.h"
|
#include "stellar/module_manager.h"
|
||||||
|
#include "stellar/polling_manager.h"
|
||||||
|
|
||||||
#include "utils.h"
|
#include "utils.h"
|
||||||
#include "session_internal.h"
|
#include "session_internal.h"
|
||||||
@@ -13,9 +14,6 @@
|
|||||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
|
||||||
#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
|
|
||||||
#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
|
|
||||||
|
|
||||||
struct session_manager_schema
|
struct session_manager_schema
|
||||||
{
|
{
|
||||||
struct exdata_schema *exdata;
|
struct exdata_schema *exdata;
|
||||||
@@ -159,14 +157,8 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int on_polling(void *args)
|
static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms)
|
||||||
{
|
{
|
||||||
struct session_manager *sess_mgr = (struct session_manager *)args;
|
|
||||||
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
|
|
||||||
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
|
|
||||||
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
|
|
||||||
uint64_t now_ms = clock_get_real_time_ms();
|
|
||||||
|
|
||||||
#define MAX_CLEANED_SESS 1024
|
#define MAX_CLEANED_SESS 1024
|
||||||
struct session *sess = NULL;
|
struct session *sess = NULL;
|
||||||
struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL};
|
struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL};
|
||||||
@@ -180,19 +172,21 @@ static int on_polling(void *args)
|
|||||||
exdata_runtime_free(exdata_rt);
|
exdata_runtime_free(exdata_rt);
|
||||||
session_manager_runtime_free_session(sess_mgr_rt, sess);
|
session_manager_runtime_free_session(sess_mgr_rt, sess);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void on_polling(struct stellar_polling_manager *poll_mgr, void *args)
|
||||||
|
{
|
||||||
|
struct session_manager *sess_mgr = (struct session_manager *)args;
|
||||||
|
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
|
||||||
|
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
|
||||||
|
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
|
||||||
|
uint64_t now_ms = clock_get_real_time_ms();
|
||||||
|
|
||||||
|
clean_session(sess_mgr_rt, now_ms);
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
// ouput stat to fs4
|
// ouput stat to fs4
|
||||||
session_manager_runtime_print_stat(sess_mgr_rt);
|
// session_manager_runtime_print_stat(sess_mgr_rt);
|
||||||
|
|
||||||
if (used == MAX_CLEANED_SESS)
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
@@ -292,10 +286,25 @@ error_out:
|
|||||||
|
|
||||||
void session_manager_free(struct session_manager *sess_mgr)
|
void session_manager_free(struct session_manager *sess_mgr)
|
||||||
{
|
{
|
||||||
|
struct session_manager_stat *stat = NULL;
|
||||||
|
struct session_manager_runtime *sess_mgr_rt = NULL;
|
||||||
|
|
||||||
if (sess_mgr)
|
if (sess_mgr)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < sess_mgr->thread_num; i++)
|
for (int i = 0; i < sess_mgr->thread_num; i++)
|
||||||
{
|
{
|
||||||
|
sess_mgr_rt = sess_mgr->runtime[i];
|
||||||
|
if (sess_mgr_rt == NULL)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
stat = session_manager_runtime_get_stat(sess_mgr_rt);
|
||||||
|
while (stat->tcp_sess_used || stat->udp_sess_used)
|
||||||
|
{
|
||||||
|
clean_session(sess_mgr_rt, UINT64_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
session_manager_runtime_free(sess_mgr->runtime[i]);
|
session_manager_runtime_free(sess_mgr->runtime[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -305,7 +314,7 @@ void session_manager_free(struct session_manager *sess_mgr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file)
|
struct session_manager *session_manager_new(struct stellar_polling_manager *poll_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file)
|
||||||
{
|
{
|
||||||
assert(pkt_mgr);
|
assert(pkt_mgr);
|
||||||
assert(mq_schema);
|
assert(mq_schema);
|
||||||
@@ -319,7 +328,7 @@ struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, stru
|
|||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (load_and_validate_toml_integer_config(toml_file, "packet.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM))
|
if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -357,6 +366,8 @@ struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, stru
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stellar_polling_subscribe(poll_mgr, on_polling, sess_mgr);
|
||||||
|
|
||||||
return sess_mgr;
|
return sess_mgr;
|
||||||
|
|
||||||
error_out:
|
error_out:
|
||||||
@@ -408,35 +419,40 @@ int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tc
|
|||||||
* session manager module
|
* session manager module
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
struct stellar_module *session_manager_module_on_init(struct stellar_module_manager *mod_mgr)
|
struct stellar_module *session_manager_on_init(struct stellar_module_manager *mod_mgr)
|
||||||
{
|
{
|
||||||
assert(mod_mgr);
|
assert(mod_mgr);
|
||||||
|
struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME);
|
||||||
struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, "packet_manager_module");
|
assert(pkt_mgr_mod);
|
||||||
struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod);
|
struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod);
|
||||||
|
assert(pkt_mgr);
|
||||||
|
struct stellar_polling_manager *poll_mgr = stellar_module_get_polling_manager(mod_mgr);
|
||||||
|
assert(poll_mgr);
|
||||||
struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr);
|
struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr);
|
||||||
|
assert(mq_schema);
|
||||||
const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr);
|
const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr);
|
||||||
|
assert(toml_file);
|
||||||
|
|
||||||
struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_schema, toml_file);
|
struct session_manager *sess_mgr = session_manager_new(poll_mgr, pkt_mgr, mq_schema, toml_file);
|
||||||
if (sess_mgr == NULL)
|
if (sess_mgr == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct stellar_module *sess_mgr_mod = stellar_module_new("session_manager_module", NULL);
|
struct stellar_module *sess_mgr_mod = stellar_module_new(SESSION_MANAGER_MODULE_NAME, NULL);
|
||||||
if (sess_mgr_mod == NULL)
|
if (sess_mgr_mod == NULL)
|
||||||
{
|
{
|
||||||
SESSION_MANAGER_LOG_ERROR("failed to create session_manager_module");
|
SESSION_MANAGER_LOG_ERROR("failed to create session_manager");
|
||||||
session_manager_free(sess_mgr);
|
session_manager_free(sess_mgr);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
stellar_module_set_ctx(sess_mgr_mod, sess_mgr);
|
stellar_module_set_ctx(sess_mgr_mod, sess_mgr);
|
||||||
|
|
||||||
SESSION_MANAGER_LOG_INFO("session_manager_module initialized");
|
SESSION_MANAGER_LOG_INFO("session_manager initialized");
|
||||||
return sess_mgr_mod;
|
return sess_mgr_mod;
|
||||||
}
|
}
|
||||||
|
|
||||||
void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod)
|
void session_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), struct stellar_module *mod)
|
||||||
{
|
{
|
||||||
if (mod)
|
if (mod)
|
||||||
{
|
{
|
||||||
@@ -444,6 +460,6 @@ void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, stru
|
|||||||
|
|
||||||
session_manager_free(sess_mgr);
|
session_manager_free(sess_mgr);
|
||||||
stellar_module_free(mod);
|
stellar_module_free(mod);
|
||||||
SESSION_MANAGER_LOG_ERROR("session_manager_module exited");
|
SESSION_MANAGER_LOG_ERROR("session_manager exited");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -14,10 +14,6 @@
|
|||||||
#include "session_transition.h"
|
#include "session_transition.h"
|
||||||
#include "session_manager_runtime.h"
|
#include "session_manager_runtime.h"
|
||||||
|
|
||||||
#define SESSION_MANAGER_RUNTIME_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
|
|
||||||
#define SESSION_MANAGER_RUNTIME_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
|
|
||||||
#define SESSION_MANAGER_RUNTIME_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
|
|
||||||
|
|
||||||
struct snowflake
|
struct snowflake
|
||||||
{
|
{
|
||||||
uint64_t seed;
|
uint64_t seed;
|
||||||
@@ -207,10 +203,10 @@ static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
|
SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
|
||||||
session_get_id(sess), session_get0_readable_addr(sess),
|
session_get_id(sess), session_get0_readable_addr(sess),
|
||||||
sess->tcp_halfs[FLOW_TYPE_C2S].assembler,
|
sess->tcp_halfs[FLOW_TYPE_C2S].assembler,
|
||||||
sess->tcp_halfs[FLOW_TYPE_S2C].assembler);
|
sess->tcp_halfs[FLOW_TYPE_S2C].assembler);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -536,46 +532,46 @@ void session_manager_config_print(struct session_manager_config *sess_mgr_cfg)
|
|||||||
if (sess_mgr_cfg)
|
if (sess_mgr_cfg)
|
||||||
{
|
{
|
||||||
// max session number
|
// max session number
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max);
|
SESSION_MANAGER_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max);
|
||||||
|
|
||||||
// session overload
|
// session overload
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit);
|
SESSION_MANAGER_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit);
|
SESSION_MANAGER_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit);
|
||||||
|
|
||||||
// TCP timeout
|
// TCP timeout
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst);
|
||||||
|
|
||||||
// UDP timeout
|
// UDP timeout
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data);
|
SESSION_MANAGER_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default);
|
SESSION_MANAGER_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default);
|
||||||
|
|
||||||
// limit
|
// limit
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms);
|
SESSION_MANAGER_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max);
|
SESSION_MANAGER_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max);
|
||||||
|
|
||||||
// duplicated packet filter
|
// duplicated packet filter
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable);
|
SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity);
|
SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms);
|
SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate);
|
SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate);
|
||||||
|
|
||||||
// eviction session filter
|
// eviction session filter
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable);
|
SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity);
|
SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms);
|
SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate);
|
SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate);
|
||||||
|
|
||||||
// TCP reassembly
|
// TCP reassembly
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max);
|
SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -777,13 +773,13 @@ static void session_manager_runtime_evicte_session(struct session_manager_runtim
|
|||||||
switch (session_get_type(sess))
|
switch (session_get_type(sess))
|
||||||
{
|
{
|
||||||
case SESSION_TYPE_TCP:
|
case SESSION_TYPE_TCP:
|
||||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess));
|
SESSION_MANAGER_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess));
|
||||||
session_table_del(sess_mgr_rt->tcp_sess_table, sess);
|
session_table_del(sess_mgr_rt->tcp_sess_table, sess);
|
||||||
SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
|
SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
|
||||||
sess_mgr_rt->stat.tcp_sess_evicted++;
|
sess_mgr_rt->stat.tcp_sess_evicted++;
|
||||||
break;
|
break;
|
||||||
case SESSION_TYPE_UDP:
|
case SESSION_TYPE_UDP:
|
||||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess));
|
SESSION_MANAGER_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess));
|
||||||
session_table_del(sess_mgr_rt->udp_sess_table, sess);
|
session_table_del(sess_mgr_rt->udp_sess_table, sess);
|
||||||
if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
|
if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
|
||||||
{
|
{
|
||||||
@@ -1064,7 +1060,7 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m
|
|||||||
{
|
{
|
||||||
if (sess)
|
if (sess)
|
||||||
{
|
{
|
||||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
|
SESSION_MANAGER_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
|
||||||
|
|
||||||
session_timer_del(sess_mgr_rt->sess_timer, sess);
|
session_timer_del(sess_mgr_rt->sess_timer, sess);
|
||||||
switch (session_get_type(sess))
|
switch (session_get_type(sess))
|
||||||
@@ -1241,7 +1237,8 @@ uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *s
|
|||||||
uint64_t expired_sess_num = 0;
|
uint64_t expired_sess_num = 0;
|
||||||
|
|
||||||
uint8_t expired_sess_canbe_clean = 0;
|
uint8_t expired_sess_canbe_clean = 0;
|
||||||
if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms)
|
if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms ||
|
||||||
|
now_ms == UINT64_MAX)
|
||||||
{
|
{
|
||||||
expired_sess_canbe_clean = 1;
|
expired_sess_canbe_clean = 1;
|
||||||
}
|
}
|
||||||
@@ -1295,25 +1292,25 @@ void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr
|
|||||||
struct session_manager_stat *stat = &sess_mgr_rt->stat;
|
struct session_manager_stat *stat = &sess_mgr_rt->stat;
|
||||||
|
|
||||||
// TCP session
|
// TCP session
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
|
SESSION_MANAGER_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
|
||||||
stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active,
|
stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active,
|
||||||
stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed);
|
stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed);
|
||||||
// UDP session
|
// UDP session
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
|
SESSION_MANAGER_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
|
||||||
stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active,
|
stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active,
|
||||||
stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed);
|
stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed);
|
||||||
// evicted session
|
// evicted session
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted);
|
SESSION_MANAGER_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted);
|
||||||
// Bypassed packet
|
// Bypassed packet
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu",
|
SESSION_MANAGER_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu",
|
||||||
stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated);
|
stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated);
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu",
|
SESSION_MANAGER_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu",
|
||||||
stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated);
|
stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated);
|
||||||
// TCP segment
|
// TCP segment
|
||||||
SESSION_MANAGER_RUNTIME_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu",
|
SESSION_MANAGER_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu",
|
||||||
stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited,
|
stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited,
|
||||||
stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered,
|
stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered,
|
||||||
stat->tcp_segs_buffered, stat->tcp_segs_freed);
|
stat->tcp_segs_buffered, stat->tcp_segs_freed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
@@ -1428,7 +1425,7 @@ uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SESSION_MANAGER_RUNTIME_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num);
|
SESSION_MANAGER_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num);
|
||||||
return mached_sess_num;
|
return mached_sess_num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,10 @@ extern "C"
|
|||||||
#include "tuple.h"
|
#include "tuple.h"
|
||||||
#include "stellar/session.h"
|
#include "stellar/session.h"
|
||||||
|
|
||||||
|
#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
|
||||||
|
#define SESSION_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
|
||||||
|
#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
|
||||||
|
|
||||||
struct session_manager_config
|
struct session_manager_config
|
||||||
{
|
{
|
||||||
uint64_t session_id_seed;
|
uint64_t session_id_seed;
|
||||||
|
|||||||
@@ -5,9 +5,10 @@
|
|||||||
#include "packet_io.h"
|
#include "packet_io.h"
|
||||||
#include "packet_internal.h"
|
#include "packet_internal.h"
|
||||||
#include "packet_manager_internal.h"
|
#include "packet_manager_internal.h"
|
||||||
|
#include "polling_manager_internal.h"
|
||||||
|
|
||||||
#include "stellar/stellar.h"
|
#include "stellar/stellar.h"
|
||||||
#include "stellar/module_manager.h"
|
#include "stellar/module_manager.h"
|
||||||
#include "polling_manager_internal.h"
|
|
||||||
|
|
||||||
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
|
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
|
||||||
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)
|
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)
|
||||||
@@ -34,7 +35,6 @@ struct stellar
|
|||||||
struct logger *logger;
|
struct logger *logger;
|
||||||
struct packet_io *pkt_io;
|
struct packet_io *pkt_io;
|
||||||
struct mq_schema *mq_schema;
|
struct mq_schema *mq_schema;
|
||||||
struct packet_manager *pkt_mgr;
|
|
||||||
struct stellar_module_manager *mod_mgr;
|
struct stellar_module_manager *mod_mgr;
|
||||||
struct stellar_thread threads[MAX_THREAD_NUM];
|
struct stellar_thread threads[MAX_THREAD_NUM];
|
||||||
};
|
};
|
||||||
@@ -49,12 +49,11 @@ static void *worker_thread(void *arg)
|
|||||||
uint16_t thread_id = thread->idx;
|
uint16_t thread_id = thread->idx;
|
||||||
struct stellar *st = thread->st;
|
struct stellar *st = thread->st;
|
||||||
struct packet_io *pkt_io = st->pkt_io;
|
struct packet_io *pkt_io = st->pkt_io;
|
||||||
struct packet_manager *pkt_mgr = st->pkt_mgr;
|
|
||||||
struct stellar_module_manager *mod_mgr = st->mod_mgr;
|
struct stellar_module_manager *mod_mgr = st->mod_mgr;
|
||||||
struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema);
|
struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema);
|
||||||
struct stellar_polling_manager *polling_mgr=stellar_module_get_polling_manager(mod_mgr);
|
struct stellar_polling_manager *polling_mgr = stellar_module_get_polling_manager(mod_mgr);
|
||||||
|
struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME);
|
||||||
|
struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod);
|
||||||
|
|
||||||
snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id);
|
snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id);
|
||||||
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
|
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
|
||||||
@@ -110,7 +109,7 @@ static void *worker_thread(void *arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
idle_tasks:
|
idle_tasks:
|
||||||
stellar_polling_dispatch(polling_mgr);
|
stellar_polling_dispatch(polling_mgr);
|
||||||
|
|
||||||
if (nr_pkt_rcv == 0)
|
if (nr_pkt_rcv == 0)
|
||||||
{
|
{
|
||||||
@@ -160,23 +159,13 @@ static void stellar_thread_join(struct stellar *st)
|
|||||||
CORE_LOG_FATAL("all worker thread exited");
|
CORE_LOG_FATAL("all worker thread exited");
|
||||||
}
|
}
|
||||||
|
|
||||||
struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file)
|
struct stellar *stellar_new(const char *toml_file)
|
||||||
{
|
{
|
||||||
if (stellar_cfg_file == NULL)
|
if (toml_file == NULL)
|
||||||
{
|
{
|
||||||
printf("stellar config file is null\n");
|
printf("stellar config file is null\n");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (module_cfg_file == NULL)
|
|
||||||
{
|
|
||||||
printf("module config file is null\n");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
if (log_cfg_file == NULL)
|
|
||||||
{
|
|
||||||
printf("log config file is null\n");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar));
|
struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar));
|
||||||
if (st == NULL)
|
if (st == NULL)
|
||||||
@@ -184,7 +173,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
st->logger = log_new(log_cfg_file);
|
st->logger = log_new(toml_file);
|
||||||
if (st->logger == NULL)
|
if (st->logger == NULL)
|
||||||
{
|
{
|
||||||
printf("unable to create logger");
|
printf("unable to create logger");
|
||||||
@@ -194,7 +183,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg
|
|||||||
__thread_local_logger = st->logger;
|
__thread_local_logger = st->logger;
|
||||||
CORE_LOG_FATAL("stellar start (version: %s)", version);
|
CORE_LOG_FATAL("stellar start (version: %s)", version);
|
||||||
|
|
||||||
if (load_and_validate_toml_integer_config(stellar_cfg_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0)
|
if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0)
|
||||||
{
|
{
|
||||||
CORE_LOG_ERROR("unable to get thread number from config file");
|
CORE_LOG_ERROR("unable to get thread number from config file");
|
||||||
goto error_out;
|
goto error_out;
|
||||||
@@ -207,21 +196,14 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg
|
|||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
st->pkt_mgr = packet_manager_new(st->mq_schema, stellar_cfg_file);
|
st->mod_mgr = stellar_module_manager_new(toml_file, st->thread_num, st->mq_schema, st->logger);
|
||||||
if (st->pkt_mgr == NULL)
|
|
||||||
{
|
|
||||||
CORE_LOG_ERROR("unable to create packet manager");
|
|
||||||
goto error_out;
|
|
||||||
}
|
|
||||||
|
|
||||||
st->mod_mgr = stellar_module_manager_new(module_cfg_file, st->thread_num, st->mq_schema, st->logger);
|
|
||||||
if (st->mod_mgr == NULL)
|
if (st->mod_mgr == NULL)
|
||||||
{
|
{
|
||||||
CORE_LOG_ERROR("unable to create module manager");
|
CORE_LOG_ERROR("unable to create module manager");
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
st->pkt_io = packet_io_new(stellar_cfg_file);
|
st->pkt_io = packet_io_new(toml_file);
|
||||||
if (st->pkt_io == NULL)
|
if (st->pkt_io == NULL)
|
||||||
{
|
{
|
||||||
CORE_LOG_ERROR("unable to create packet io");
|
CORE_LOG_ERROR("unable to create packet io");
|
||||||
@@ -273,7 +255,6 @@ void stellar_free(struct stellar *st)
|
|||||||
|
|
||||||
packet_io_free(st->pkt_io);
|
packet_io_free(st->pkt_io);
|
||||||
stellar_module_manager_free(st->mod_mgr);
|
stellar_module_manager_free(st->mod_mgr);
|
||||||
packet_manager_free(st->pkt_mgr);
|
|
||||||
mq_schema_free(st->mq_schema);
|
mq_schema_free(st->mq_schema);
|
||||||
|
|
||||||
CORE_LOG_FATAL("stellar exit\n");
|
CORE_LOG_FATAL("stellar exit\n");
|
||||||
|
|||||||
@@ -52,8 +52,14 @@ global:
|
|||||||
log_print;
|
log_print;
|
||||||
log_check_level;
|
log_check_level;
|
||||||
|
|
||||||
session_manager_module_on_init;
|
polling_manager_on_init;
|
||||||
session_manager_module_on_exit;
|
polling_manager_on_exit;
|
||||||
|
|
||||||
|
packet_manager_on_init;
|
||||||
|
packet_manager_on_exit;
|
||||||
|
|
||||||
|
session_manager_on_init;
|
||||||
|
session_manager_on_exit;
|
||||||
|
|
||||||
http_message_*;
|
http_message_*;
|
||||||
http_decoder_init;
|
http_decoder_init;
|
||||||
|
|||||||
Reference in New Issue
Block a user