diff --git a/conf/CMakeLists.txt b/conf/CMakeLists.txt index 5040b1e..d4c645b 100644 --- a/conf/CMakeLists.txt +++ b/conf/CMakeLists.txt @@ -1,2 +1 @@ -install(FILES stellar.toml DESTINATION conf COMPONENT PROGRAM) -install(FILES log.toml DESTINATION conf COMPONENT PROGRAM) \ No newline at end of file +install(FILES stellar.toml DESTINATION conf COMPONENT PROGRAM) \ No newline at end of file diff --git a/conf/log.toml b/conf/log.toml deleted file mode 100644 index 6df9dc6..0000000 --- a/conf/log.toml +++ /dev/null @@ -1,4 +0,0 @@ -[log] -output = "both" # stderr, file, both -file = "log/stellar.log" -level = "INFO" # TRACE, DEBUG, INFO, WARN, ERROR, FATAL diff --git a/conf/stellar.toml b/conf/stellar.toml index f5d4040..b71d869 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -61,4 +61,24 @@ [stat] merge_interval_ms = 500 # range: [0, 60000] (ms) - output_interval_ms = 1000 # range: [0, 60000] (ms) \ No newline at end of file + output_interval_ms = 1000 # range: [0, 60000] (ms) + +[log] + output = "both" # stderr, file, both + file = "log/stellar.log" + level = "INFO" # TRACE, DEBUG, INFO, WARN, ERROR, FATAL + +[[module]] +path = "" +init = "polling_manager_on_init" +exit = "polling_manager_on_exit" + +[[module]] +path = "" +init = "packet_manager_on_init" +exit = "packet_manager_on_exit" + +[[module]] +path = "" +init = "session_manager_on_init" +exit = "session_manager_on_exit" \ No newline at end of file diff --git a/include/stellar/module_manager.h b/include/stellar/module_manager.h index 70ee5a5..a944186 100644 --- a/include/stellar/module_manager.h +++ b/include/stellar/module_manager.h @@ -8,6 +8,9 @@ extern "C" #include "stellar/mq.h" #include "stellar/log.h" +#define PACKET_MANAGER_MODULE_NAME "packet_manager_module" +#define SESSION_MANAGER_MODULE_NAME "session_manager_module" + struct stellar_module; struct stellar_module *stellar_module_new(const char *name, void *ctx); void stellar_module_free(struct stellar_module *mod); diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index e03b791..2f103a3 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -23,7 +23,7 @@ void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, c // only send user build packet, can't send packet which come from network // void stellar_send_build_packet(struct stellar *st, struct packet *pkt); -struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file); +struct stellar *stellar_new(const char *toml_file); void stellar_run(struct stellar *st); void stellar_free(struct stellar *st); void stellar_loopbreak(struct stellar *st); diff --git a/infra/main.c b/infra/main.c index 171ef09..a9efe12 100644 --- a/infra/main.c +++ b/infra/main.c @@ -37,7 +37,7 @@ int main(int argc __attribute__((__unused__)), char **argv __attribute__((__unus signal(SIGTERM, signal_handler); signal(SIGHUP, signal_handler); - st = stellar_new("./conf/stellar.toml", "./module/spec.toml", "./conf/log.toml"); + st = stellar_new("./conf/stellar.toml"); if (st == NULL) { return 0; diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index 70d0599..45eda02 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -7,8 +7,8 @@ #include "marsio_io.h" #include "log_internal.h" -#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet_io", format, ##__VA_ARGS__) -#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet_io", format, ##__VA_ARGS__) +#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet io", format, ##__VA_ARGS__) +#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet io", format, ##__VA_ARGS__) struct packet_io { diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt index 24cae44..9b9c132 100644 --- a/infra/packet_manager/CMakeLists.txt +++ b/infra/packet_manager/CMakeLists.txt @@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra) -target_link_libraries(packet_manager tuple logger dablooms mq exdata) +target_link_libraries(packet_manager tuple logger dablooms mq exdata module_manager) add_subdirectory(test) \ No newline at end of file diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 2c422d3..d9e5305 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -1,7 +1,9 @@ #include -#include "utils.h" #include "stellar/mq.h" +#include "stellar/module_manager.h" + +#include "utils.h" #include "packet_internal.h" #include "packet_manager_internal.h" @@ -272,11 +274,14 @@ void packet_manager_free(struct packet_manager *pkt_mgr) { if (pkt_mgr) { - for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) + if (pkt_mgr->cfg) { - if (pkt_mgr->runtime[i]) + for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) { - packet_manager_runtime_free(pkt_mgr->runtime[i]); + if (pkt_mgr->runtime[i]) + { + packet_manager_runtime_free(pkt_mgr->runtime[i]); + } } } @@ -300,6 +305,9 @@ int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage s void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt) { + assert(pkt_mgr); + assert(thread_id < pkt_mgr->cfg->nr_worker_thread); + assert(mq_rt); struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; runtime->mq = mq_rt; @@ -432,3 +440,46 @@ void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_i runtime->stat.queue[i].pkts_schedule); } } + +/****************************************************************************** + * packet manager module + ******************************************************************************/ + +struct stellar_module *packet_manager_on_init(struct stellar_module_manager *mod_mgr) +{ + assert(mod_mgr); + struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); + assert(mq_schema); + const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr); + assert(toml_file); + + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, toml_file); + if (pkt_mgr == NULL) + { + return NULL; + } + + struct stellar_module *pkt_mgr_mod = stellar_module_new(PACKET_MANAGER_MODULE_NAME, NULL); + if (pkt_mgr_mod == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager"); + packet_manager_free(pkt_mgr); + return NULL; + } + stellar_module_set_ctx(pkt_mgr_mod, pkt_mgr); + + PACKET_MANAGER_LOG_INFO("packet_manager initialized"); + return pkt_mgr_mod; +} + +void packet_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), struct stellar_module *mod) +{ + if (mod) + { + struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod); + + packet_manager_free(pkt_mgr); + stellar_module_free(mod); + PACKET_MANAGER_LOG_INFO("packet_manager exited"); + } +} \ No newline at end of file diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt index 9e9091e..a045740 100644 --- a/infra/session_manager/CMakeLists.txt +++ b/infra/session_manager/CMakeLists.txt @@ -13,6 +13,4 @@ target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata) -add_subdirectory(test) - -install(FILES conf/spec.toml DESTINATION module COMPONENT PROGRAM) \ No newline at end of file +add_subdirectory(test) \ No newline at end of file diff --git a/infra/session_manager/conf/spec.toml b/infra/session_manager/conf/spec.toml deleted file mode 100644 index 315f02a..0000000 --- a/infra/session_manager/conf/spec.toml +++ /dev/null @@ -1,4 +0,0 @@ -[[module]] -path = "./lib/libstellar.so" -init = "session_manager_module_on_init" -exit = "session_manager_module_on_exit" diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 735bc4f..b4a7b17 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -5,6 +5,7 @@ #include "stellar/packet_manager.h" #include "stellar/session_manager.h" #include "stellar/module_manager.h" +#include "stellar/polling_manager.h" #include "utils.h" #include "session_internal.h" @@ -13,9 +14,6 @@ #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wunused-function" -#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__) -#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__) - struct session_manager_schema { struct exdata_schema *exdata; @@ -159,14 +157,8 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void * } } -static int on_polling(void *args) +static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms) { - struct session_manager *sess_mgr = (struct session_manager *)args; - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; - int thread_id = stellar_module_manager_get_thread_id(mod_mgr); - struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; - uint64_t now_ms = clock_get_real_time_ms(); - #define MAX_CLEANED_SESS 1024 struct session *sess = NULL; struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL}; @@ -180,19 +172,21 @@ static int on_polling(void *args) exdata_runtime_free(exdata_rt); session_manager_runtime_free_session(sess_mgr_rt, sess); } +} + +static void on_polling(struct stellar_polling_manager *poll_mgr, void *args) +{ + struct session_manager *sess_mgr = (struct session_manager *)args; + struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; + int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + uint64_t now_ms = clock_get_real_time_ms(); + + clean_session(sess_mgr_rt, now_ms); // TODO // ouput stat to fs4 - session_manager_runtime_print_stat(sess_mgr_rt); - - if (used == MAX_CLEANED_SESS) - { - return 1; - } - else - { - return 0; - } + // session_manager_runtime_print_stat(sess_mgr_rt); } /****************************************************************************** @@ -292,10 +286,25 @@ error_out: void session_manager_free(struct session_manager *sess_mgr) { + struct session_manager_stat *stat = NULL; + struct session_manager_runtime *sess_mgr_rt = NULL; + if (sess_mgr) { for (int i = 0; i < sess_mgr->thread_num; i++) { + sess_mgr_rt = sess_mgr->runtime[i]; + if (sess_mgr_rt == NULL) + { + continue; + } + + stat = session_manager_runtime_get_stat(sess_mgr_rt); + while (stat->tcp_sess_used || stat->udp_sess_used) + { + clean_session(sess_mgr_rt, UINT64_MAX); + } + session_manager_runtime_free(sess_mgr->runtime[i]); } @@ -305,7 +314,7 @@ void session_manager_free(struct session_manager *sess_mgr) } } -struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) +struct session_manager *session_manager_new(struct stellar_polling_manager *poll_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) { assert(pkt_mgr); assert(mq_schema); @@ -319,7 +328,7 @@ struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, stru { return NULL; } - if (load_and_validate_toml_integer_config(toml_file, "packet.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) + if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) { return NULL; } @@ -357,6 +366,8 @@ struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, stru } } + stellar_polling_subscribe(poll_mgr, on_polling, sess_mgr); + return sess_mgr; error_out: @@ -408,35 +419,40 @@ int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tc * session manager module ******************************************************************************/ -struct stellar_module *session_manager_module_on_init(struct stellar_module_manager *mod_mgr) +struct stellar_module *session_manager_on_init(struct stellar_module_manager *mod_mgr) { assert(mod_mgr); - - struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, "packet_manager_module"); + struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); + assert(pkt_mgr_mod); struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod); + assert(pkt_mgr); + struct stellar_polling_manager *poll_mgr = stellar_module_get_polling_manager(mod_mgr); + assert(poll_mgr); struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); + assert(mq_schema); const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr); + assert(toml_file); - struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_schema, toml_file); + struct session_manager *sess_mgr = session_manager_new(poll_mgr, pkt_mgr, mq_schema, toml_file); if (sess_mgr == NULL) { return NULL; } - struct stellar_module *sess_mgr_mod = stellar_module_new("session_manager_module", NULL); + struct stellar_module *sess_mgr_mod = stellar_module_new(SESSION_MANAGER_MODULE_NAME, NULL); if (sess_mgr_mod == NULL) { - SESSION_MANAGER_LOG_ERROR("failed to create session_manager_module"); + SESSION_MANAGER_LOG_ERROR("failed to create session_manager"); session_manager_free(sess_mgr); return NULL; } stellar_module_set_ctx(sess_mgr_mod, sess_mgr); - SESSION_MANAGER_LOG_INFO("session_manager_module initialized"); + SESSION_MANAGER_LOG_INFO("session_manager initialized"); return sess_mgr_mod; } -void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod) +void session_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), struct stellar_module *mod) { if (mod) { @@ -444,6 +460,6 @@ void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, stru session_manager_free(sess_mgr); stellar_module_free(mod); - SESSION_MANAGER_LOG_ERROR("session_manager_module exited"); + SESSION_MANAGER_LOG_ERROR("session_manager exited"); } } \ No newline at end of file diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c index f7786a1..2ea160c 100644 --- a/infra/session_manager/session_manager_runtime.c +++ b/infra/session_manager/session_manager_runtime.c @@ -14,10 +14,6 @@ #include "session_transition.h" #include "session_manager_runtime.h" -#define SESSION_MANAGER_RUNTIME_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) -#define SESSION_MANAGER_RUNTIME_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) -#define SESSION_MANAGER_RUNTIME_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) - struct snowflake { uint64_t seed; @@ -207,10 +203,10 @@ static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session return -1; } - SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p", - session_get_id(sess), session_get0_readable_addr(sess), - sess->tcp_halfs[FLOW_TYPE_C2S].assembler, - sess->tcp_halfs[FLOW_TYPE_S2C].assembler); + SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p", + session_get_id(sess), session_get0_readable_addr(sess), + sess->tcp_halfs[FLOW_TYPE_C2S].assembler, + sess->tcp_halfs[FLOW_TYPE_S2C].assembler); return 0; } @@ -536,46 +532,46 @@ void session_manager_config_print(struct session_manager_config *sess_mgr_cfg) if (sess_mgr_cfg) { // max session number - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max); + SESSION_MANAGER_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max); // session overload - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit); + SESSION_MANAGER_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit); + SESSION_MANAGER_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit); // TCP timeout - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst); // UDP timeout - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default); + SESSION_MANAGER_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data); + SESSION_MANAGER_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default); // limit - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max); + SESSION_MANAGER_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms); + SESSION_MANAGER_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max); // duplicated packet filter - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate); + SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable); + SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity); + SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms); + SESSION_MANAGER_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate); // eviction session filter - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate); + SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable); + SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity); + SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms); + SESSION_MANAGER_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate); // TCP reassembly - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms); - SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms); + SESSION_MANAGER_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max); } } @@ -777,13 +773,13 @@ static void session_manager_runtime_evicte_session(struct session_manager_runtim switch (session_get_type(sess)) { case SESSION_TYPE_TCP: - SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess)); + SESSION_MANAGER_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess)); session_table_del(sess_mgr_rt->tcp_sess_table, sess); SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); sess_mgr_rt->stat.tcp_sess_evicted++; break; case SESSION_TYPE_UDP: - SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess)); + SESSION_MANAGER_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess)); session_table_del(sess_mgr_rt->udp_sess_table, sess); if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) { @@ -1064,7 +1060,7 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m { if (sess) { - SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); + SESSION_MANAGER_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); session_timer_del(sess_mgr_rt->sess_timer, sess); switch (session_get_type(sess)) @@ -1241,7 +1237,8 @@ uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *s uint64_t expired_sess_num = 0; uint8_t expired_sess_canbe_clean = 0; - if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms) + if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms || + now_ms == UINT64_MAX) { expired_sess_canbe_clean = 1; } @@ -1295,25 +1292,25 @@ void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr struct session_manager_stat *stat = &sess_mgr_rt->stat; // TCP session - SESSION_MANAGER_RUNTIME_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", - stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active, - stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed); + SESSION_MANAGER_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", + stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active, + stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed); // UDP session - SESSION_MANAGER_RUNTIME_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", - stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active, - stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed); + SESSION_MANAGER_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", + stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active, + stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed); // evicted session - SESSION_MANAGER_RUNTIME_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted); + SESSION_MANAGER_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted); // Bypassed packet - SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu", - stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated); - SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu", - stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated); + SESSION_MANAGER_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu", + stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated); + SESSION_MANAGER_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu", + stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated); // TCP segment - SESSION_MANAGER_RUNTIME_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu", - stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited, - stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered, - stat->tcp_segs_buffered, stat->tcp_segs_freed); + SESSION_MANAGER_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu", + stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited, + stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered, + stat->tcp_segs_buffered, stat->tcp_segs_freed); } /****************************************************************************** @@ -1428,7 +1425,7 @@ uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess } } - SESSION_MANAGER_RUNTIME_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num); + SESSION_MANAGER_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num); return mached_sess_num; } diff --git a/infra/session_manager/session_manager_runtime.h b/infra/session_manager/session_manager_runtime.h index 910c6a7..5218bcb 100644 --- a/infra/session_manager/session_manager_runtime.h +++ b/infra/session_manager/session_manager_runtime.h @@ -8,6 +8,10 @@ extern "C" #include "tuple.h" #include "stellar/session.h" +#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__) +#define SESSION_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager", format, ##__VA_ARGS__) +#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__) + struct session_manager_config { uint64_t session_id_seed; diff --git a/infra/stellar_core.c b/infra/stellar_core.c index c478c85..b3e4eec 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -5,9 +5,10 @@ #include "packet_io.h" #include "packet_internal.h" #include "packet_manager_internal.h" +#include "polling_manager_internal.h" + #include "stellar/stellar.h" #include "stellar/module_manager.h" -#include "polling_manager_internal.h" #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) @@ -34,7 +35,6 @@ struct stellar struct logger *logger; struct packet_io *pkt_io; struct mq_schema *mq_schema; - struct packet_manager *pkt_mgr; struct stellar_module_manager *mod_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; }; @@ -49,12 +49,11 @@ static void *worker_thread(void *arg) uint16_t thread_id = thread->idx; struct stellar *st = thread->st; struct packet_io *pkt_io = st->pkt_io; - struct packet_manager *pkt_mgr = st->pkt_mgr; struct stellar_module_manager *mod_mgr = st->mod_mgr; struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); - struct stellar_polling_manager *polling_mgr=stellar_module_get_polling_manager(mod_mgr); - - + struct stellar_polling_manager *polling_mgr = stellar_module_get_polling_manager(mod_mgr); + struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); + struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod); snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); @@ -110,7 +109,7 @@ static void *worker_thread(void *arg) } idle_tasks: - stellar_polling_dispatch(polling_mgr); + stellar_polling_dispatch(polling_mgr); if (nr_pkt_rcv == 0) { @@ -160,23 +159,13 @@ static void stellar_thread_join(struct stellar *st) CORE_LOG_FATAL("all worker thread exited"); } -struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file) +struct stellar *stellar_new(const char *toml_file) { - if (stellar_cfg_file == NULL) + if (toml_file == NULL) { printf("stellar config file is null\n"); return NULL; } - if (module_cfg_file == NULL) - { - printf("module config file is null\n"); - return NULL; - } - if (log_cfg_file == NULL) - { - printf("log config file is null\n"); - return NULL; - } struct stellar *st = (struct stellar *)calloc(1, sizeof(struct stellar)); if (st == NULL) @@ -184,7 +173,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg return NULL; } - st->logger = log_new(log_cfg_file); + st->logger = log_new(toml_file); if (st->logger == NULL) { printf("unable to create logger"); @@ -194,7 +183,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg __thread_local_logger = st->logger; CORE_LOG_FATAL("stellar start (version: %s)", version); - if (load_and_validate_toml_integer_config(stellar_cfg_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0) + if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0) { CORE_LOG_ERROR("unable to get thread number from config file"); goto error_out; @@ -207,21 +196,14 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg goto error_out; } - st->pkt_mgr = packet_manager_new(st->mq_schema, stellar_cfg_file); - if (st->pkt_mgr == NULL) - { - CORE_LOG_ERROR("unable to create packet manager"); - goto error_out; - } - - st->mod_mgr = stellar_module_manager_new(module_cfg_file, st->thread_num, st->mq_schema, st->logger); + st->mod_mgr = stellar_module_manager_new(toml_file, st->thread_num, st->mq_schema, st->logger); if (st->mod_mgr == NULL) { CORE_LOG_ERROR("unable to create module manager"); goto error_out; } - st->pkt_io = packet_io_new(stellar_cfg_file); + st->pkt_io = packet_io_new(toml_file); if (st->pkt_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); @@ -273,7 +255,6 @@ void stellar_free(struct stellar *st) packet_io_free(st->pkt_io); stellar_module_manager_free(st->mod_mgr); - packet_manager_free(st->pkt_mgr); mq_schema_free(st->mq_schema); CORE_LOG_FATAL("stellar exit\n"); diff --git a/infra/version.map b/infra/version.map index 8a87296..e9008aa 100644 --- a/infra/version.map +++ b/infra/version.map @@ -52,8 +52,14 @@ global: log_print; log_check_level; - session_manager_module_on_init; - session_manager_module_on_exit; + polling_manager_on_init; + polling_manager_on_exit; + + packet_manager_on_init; + packet_manager_on_exit; + + session_manager_on_init; + session_manager_on_exit; http_message_*; http_decoder_init;