From b3ddebf7704a624ccfb41607f35f7a38732ff38a Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Fri, 20 Sep 2024 18:41:07 +0800 Subject: [PATCH] refactor(main loop): compiled --- CMakeLists.txt | 2 +- include/stellar/stellar.h | 4 +- infra/main.c | 2 +- infra/packet_io/packet_io.c | 64 +-- infra/packet_io/packet_io.h | 6 +- infra/session_manager/CMakeLists.txt | 4 +- infra/session_manager/conf/spec.toml | 4 + infra/session_manager/session_manager.c | 2 +- infra/stellar_core.c | 512 ++++++------------------ infra/version.map | 2 - 10 files changed, 164 insertions(+), 438 deletions(-) create mode 100644 infra/session_manager/conf/spec.toml diff --git a/CMakeLists.txt b/CMakeLists.txt index d8f7288..27c88ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,4 +87,4 @@ add_subdirectory(include) add_subdirectory(test) install(DIRECTORY DESTINATION log COMPONENT PROGRAM) -install(DIRECTORY DESTINATION plugin COMPONENT PROGRAM) \ No newline at end of file +install(DIRECTORY DESTINATION module COMPONENT PROGRAM) \ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 397e1a4..a2b6f28 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -26,9 +26,9 @@ int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_pol void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); // only send user build packet, can't send packet which come from network -void stellar_send_build_packet(struct stellar *st, struct packet *pkt); +// void stellar_send_build_packet(struct stellar *st, struct packet *pkt); -struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file); +struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file); void stellar_run(struct stellar *st); void stellar_free(struct stellar *st); void stellar_loopbreak(struct stellar *st); diff --git a/infra/main.c b/infra/main.c index becc338..171ef09 100644 --- a/infra/main.c +++ b/infra/main.c @@ -37,7 +37,7 @@ int main(int argc __attribute__((__unused__)), char **argv __attribute__((__unus signal(SIGTERM, signal_handler); signal(SIGHUP, signal_handler); - st = stellar_new("./conf/stellar.toml", "./plugin/spec.toml", "./conf/log.toml"); + st = stellar_new("./conf/stellar.toml", "./module/spec.toml", "./conf/log.toml"); if (st == NULL) { return 0; diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index 673b1ce..70d0599 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -12,6 +12,7 @@ struct packet_io { + struct packet_io_config *cfg; void *handle; void *(*new_func)(const struct packet_io_config *cfg); @@ -27,7 +28,7 @@ struct packet_io struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); }; -int packet_io_config_load(struct packet_io_config *cfg, const char *toml_file) +static struct packet_io_config *packet_io_config_new(const char *toml_file) { int ret = -1; const char *ptr; @@ -41,6 +42,12 @@ int packet_io_config_load(struct packet_io_config *cfg, const char *toml_file) toml_table_t *table = NULL; toml_array_t *mask; + struct packet_io_config *cfg = (struct packet_io_config *)calloc(1, sizeof(struct packet_io_config)); + if (cfg == NULL) + { + return NULL; + } + fp = fopen(toml_file, "r"); if (fp == NULL) { @@ -185,32 +192,18 @@ error_out: fclose(fp); } - return ret; + if (ret == -1) + { + free(cfg); + return NULL; + } + else + { + return cfg; + } } -struct packet_io_config *packet_io_config_new(const char *toml_file) -{ - if (toml_file == NULL) - { - return NULL; - } - - struct packet_io_config *cfg = (struct packet_io_config *)calloc(1, sizeof(struct packet_io_config)); - if (cfg == NULL) - { - return NULL; - } - - if (packet_io_config_load(cfg, toml_file) == -1) - { - packet_io_config_free(cfg); - return NULL; - } - - return cfg; -} - -void packet_io_config_free(struct packet_io_config *cfg) +static void packet_io_config_free(struct packet_io_config *cfg) { if (cfg) { @@ -219,7 +212,7 @@ void packet_io_config_free(struct packet_io_config *cfg) } } -void packet_io_config_print(const struct packet_io_config *cfg) +static void packet_io_config_print(const struct packet_io_config *cfg) { if (cfg) { @@ -242,7 +235,7 @@ void packet_io_config_print(const struct packet_io_config *cfg) } } -struct packet_io *packet_io_new(const struct packet_io_config *cfg) +struct packet_io *packet_io_new(const char *toml_file) { struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); if (pkt_io == NULL) @@ -250,7 +243,16 @@ struct packet_io *packet_io_new(const struct packet_io_config *cfg) return NULL; } - if (cfg->mode == PACKET_IO_MARSIO) + pkt_io->cfg = packet_io_config_new(toml_file); + if (pkt_io->cfg == NULL) + { + free(pkt_io); + return NULL; + } + + packet_io_config_print(pkt_io->cfg); + + if (pkt_io->cfg->mode == PACKET_IO_MARSIO) { pkt_io->new_func = marsio_io_new; pkt_io->free_func = marsio_io_free; @@ -277,7 +279,7 @@ struct packet_io *packet_io_new(const struct packet_io_config *cfg) pkt_io->stat_func = pcap_io_stat; } - pkt_io->handle = pkt_io->new_func(cfg); + pkt_io->handle = pkt_io->new_func(pkt_io->cfg); if (pkt_io->handle == NULL) { packet_io_free(pkt_io); @@ -295,6 +297,10 @@ void packet_io_free(struct packet_io *pkt_io) { pkt_io->free_func(pkt_io->handle); } + if (pkt_io->cfg) + { + packet_io_config_free(pkt_io->cfg); + } free(pkt_io); pkt_io = NULL; } diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h index 5b677e8..d5b089b 100644 --- a/infra/packet_io/packet_io.h +++ b/infra/packet_io/packet_io.h @@ -64,13 +64,9 @@ struct packet_io_config uint64_t idle_yield_interval_ms; // range: [0, 6000] (ms) }; -struct packet_io_config *packet_io_config_new(const char *toml_file); -void packet_io_config_free(struct packet_io_config *cfg); -void packet_io_config_print(const struct packet_io_config *cfg); - struct packet; struct packet_io; -struct packet_io *packet_io_new(const struct packet_io_config *cfg); +struct packet_io *packet_io_new(const char *toml_file); void packet_io_free(struct packet_io *pkt_io); int packet_io_isbreak(struct packet_io *pkt_io); diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt index a045740..9e9091e 100644 --- a/infra/session_manager/CMakeLists.txt +++ b/infra/session_manager/CMakeLists.txt @@ -13,4 +13,6 @@ target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata) -add_subdirectory(test) \ No newline at end of file +add_subdirectory(test) + +install(FILES conf/spec.toml DESTINATION module COMPONENT PROGRAM) \ No newline at end of file diff --git a/infra/session_manager/conf/spec.toml b/infra/session_manager/conf/spec.toml new file mode 100644 index 0000000..315f02a --- /dev/null +++ b/infra/session_manager/conf/spec.toml @@ -0,0 +1,4 @@ +[[module]] +path = "./lib/libstellar.so" +init = "session_manager_module_on_init" +exit = "session_manager_module_on_exit" diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index e8b2a08..a9b5c38 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -14,7 +14,6 @@ #pragma GCC diagnostic ignored "-Wunused-function" #define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__) -#define SESSION_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager", format, ##__VA_ARGS__) #define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__) struct session_manager_schema @@ -334,6 +333,7 @@ struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, stru SESSION_MANAGER_LOG_ERROR("failed to create session_manager_config"); goto error_out; } + session_manager_config_print(sess_mgr->cfg); sess_mgr->schema = session_manager_schema_new(pkt_mgr, mq_schema, sess_mgr); if (sess_mgr->schema == NULL) diff --git a/infra/stellar_core.c b/infra/stellar_core.c index 56131d6..49a3546 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -1,24 +1,13 @@ -#include -#include -#include #include -#include -#include -#include #include #include +#include "packet_io.h" +#include "packet_internal.h" +#include "packet_manager_internal.h" #include "stellar/stellar.h" #include "stellar/module_manager.h" -#include "utils.h" -#include "packet_io.h" -#include "log_internal.h" -#include "stellar_stat.h" -#include "packet_internal.h" -#include "session_internal.h" -#include "session_manager_runtime.h" - #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "core", format, ##__VA_ARGS__) @@ -29,325 +18,119 @@ static __attribute__((__used__)) const char *version = STELLAR_GIT_VERSION; static __attribute__((__used__)) const char *version = "Unknown"; #endif -static const char logo_str[] = - " _ _ _\n" - " ___ | |_ ___ | | | | __ _ _ __\n" - " / __| | __| / _ \\ | | | | / _` | | '__|\n" - " \\__ \\ | |_ | __/ | | | | | (_| | | |\n" - " |___/ \\__| \\___| |_| |_| \\__,_| |_|\n"; - struct stellar_thread { pthread_t tid; uint16_t idx; uint64_t is_runing; - struct ip_reassembly *ip_reass; - struct session_manager_runtime *sess_mgr_rt; struct stellar *st; }; -struct stellar_runtime -{ - uint64_t need_exit; - struct logger *logger; - struct stellar_stat *stat; - struct packet_io *packet_io; - struct mq_schema *mq_schema; - struct stellar_module_manager *mod_mgr; - struct stellar_thread threads[MAX_THREAD_NUM]; -}; - -struct stellar_config -{ - uint64_t instance_id; - struct session_manager_config *sess_mgr_cfg; - struct ip_reassembly_config *ip_reass_cfg; - struct packet_io_config *pkt_io_cfg; - struct stellar_stat_config *stat_cfg; -}; - struct stellar { - char magic[2]; // for check memory corruption - struct stellar_config config; - struct stellar_runtime runtime; + uint16_t thread_num; + uint64_t need_exit; + struct logger *logger; + struct packet_io *pkt_io; + struct mq_schema *mq_schema; + struct packet_manager *pkt_mgr; + struct stellar_module_manager *mod_mgr; + struct stellar_thread threads[MAX_THREAD_NUM]; }; -static __thread uint16_t __current_thread_idx = UINT16_MAX; - -/****************************************************************************** - * Stellar Thread Main Loop - ******************************************************************************/ - -static void update_stat(struct session *sess, struct packet *pkt) -{ - enum flow_type type = session_get_flow_type(sess); - assert(type != FLOW_TYPE_NONE); - int is_ctrl = packet_is_ctrl(pkt); - uint16_t len = packet_get_raw_len(pkt); - switch (packet_get_action(pkt)) - { - case PACKET_ACTION_DROP: - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len); - break; - case PACKET_ACTION_FORWARD: - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); - session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len); - break; - default: - assert(0); - break; - } -} - -static inline void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms) -{ - struct session *sess = NULL; - struct session *cleaned_sess[RX_BURST_MAX * 16]; - uint64_t nr_sess_cleaned = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned_sess, sizeof(cleaned_sess) / sizeof(cleaned_sess[0])); - for (uint64_t j = 0; j < nr_sess_cleaned; j++) - { - sess = cleaned_sess[j]; - // session_exdata_runtime_free(session_get_user_data(sess)); - session_manager_runtime_free_session(sess_mgr_rt, sess); - } -} - static void *worker_thread(void *arg) { - int nr_pkt_received = 0; - uint64_t now_ms = 0; - char thd_name[16] = {0}; + int nr_pkt_rcv = 0; + char thread_name[16] = {0}; struct packet *pkt = NULL; - struct packet *defraged_pkt = NULL; struct packet packets[RX_BURST_MAX]; - struct session *sess = NULL; struct stellar_thread *thread = (struct stellar_thread *)arg; - struct ip_reassembly *ip_reass = thread->ip_reass; - struct session_manager_runtime *sess_mgr_rt = thread->sess_mgr_rt; - struct session_manager_stat *sess_stat = session_manager_runtime_get_stat(sess_mgr_rt); + uint16_t thread_id = thread->idx; struct stellar *st = thread->st; - struct stellar_runtime *runtime = &st->runtime; - struct packet_io *packet_io = runtime->packet_io; - struct stellar_module_manager *mod_mgr = runtime->mod_mgr; - struct mq_runtime *mq_rt = mq_runtime_new(runtime->mq_schema); + struct packet_io *pkt_io = st->pkt_io; + struct packet_manager *pkt_mgr = st->pkt_mgr; + struct stellar_module_manager *mod_mgr = st->mod_mgr; + struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); - stellar_module_manager_register_thread(mod_mgr, thread->tid, mq_rt); + snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); + prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); - struct thread_stat thr_stat = { - .pkt_io = packet_io_stat(packet_io, thread->idx), - .ip_reass = ip_reassembly_stat(ip_reass), - .sess_mgr = session_manager_runtime_get_stat(sess_mgr_rt), - }; - uint16_t thr_idx = thread->idx; + __thread_local_logger = st->logger; + stellar_module_manager_register_thread(mod_mgr, thread_id, mq_rt); - __current_thread_idx = thr_idx; - __thread_local_logger = runtime->logger; - - memset(packets, 0, sizeof(packets)); - - for (int i = 0; i < RX_BURST_MAX; i++) - { - packet_set_user_data(&packets[i], (void *)mod_mgr); - } - - snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx); - prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); - - if (packet_io_init(packet_io, thr_idx) != 0) + if (packet_io_init(pkt_io, thread_id) != 0) { CORE_LOG_ERROR("unable to init packet io"); return NULL; } + packet_manager_init(pkt_mgr, thread_id, mq_rt); ATOMIC_SET(&thread->is_runing, 1); - CORE_LOG_FATAL("worker thread %d runing", thr_idx); + CORE_LOG_FATAL("worker thread %d runing", thread_id); - while (ATOMIC_READ(&runtime->need_exit) == 0) + while (ATOMIC_READ(&st->need_exit) == 0) { - /* - * We use the system's real time instead of monotonic time for the following reasons: - * -> Session creation/closure times require real time (e.g., for logging session activities). - * -> Session ID generation relies on real time (e.g., for reverse calculating session creation time from the session ID). - * - * Note: Modifying the system time will affect the timing wheel, impacting session expiration, IP reassembly expiration, and TCP reassembly expiration. - * Suggestion: After modifying the system time, restart the service to ensure consistent timing. - */ - now_ms = clock_get_real_time_ms(); - nr_pkt_received = packet_io_ingress(packet_io, thr_idx, packets, RX_BURST_MAX); - if (nr_pkt_received == 0) + // TODO + memset(packets, 0, sizeof(packets)); + nr_pkt_rcv = packet_io_ingress(pkt_io, thread_id, packets, RX_BURST_MAX); + if (nr_pkt_rcv == 0) { goto idle_tasks; } - for (int i = 0; i < nr_pkt_received; i++) + for (int i = 0; i < nr_pkt_rcv; i++) { - sess = NULL; - defraged_pkt = NULL; - pkt = &packets[i]; + // TODO alloc struct packet from packet pool + pkt = calloc(1, sizeof(struct packet)); + memcpy(pkt, &packets[i], sizeof(struct packet)); + pkt->need_free = 1; - // plugin_manager_on_packet_input(plug_mgr, pkt); - if (packet_is_fragment(pkt)) - { - defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); - if (defraged_pkt == NULL) - { - goto fast_path; - } - else - { - pkt = defraged_pkt; - // plugin_manager_on_packet_input(plug_mgr, defraged_pkt); - } - } + packet_manager_ingress(pkt_mgr, thread_id, pkt); + packet_manager_dispatch(pkt_mgr, thread_id); + pkt = packet_manager_egress(pkt_mgr, thread_id); - sess = session_manager_runtime_lookup_session_by_packet(sess_mgr_rt, pkt); - if (sess == NULL) + if (pkt == NULL) { - sess = session_manager_runtime_new_session(sess_mgr_rt, pkt, now_ms); - if (sess == NULL) - { - goto fast_path; - } - // struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st); - // session_set_user_data(sess, per_sess_exdata); - } - else - { - if (session_manager_runtime_update_session(sess_mgr_rt, sess, pkt, now_ms) == -1) - { - goto fast_path; - } - } - - fast_path: - if (pkt == defraged_pkt) - { - // plugin_manager_on_packet_output(plug_mgr, defraged_pkt); - // plugin_manager_on_packet_output(plug_mgr, &packets[i]); - } - else - { - // plugin_manager_on_packet_output(plug_mgr, pkt); - } - - if (sess) - { - if (session_get_current_state(sess) == SESSION_STATE_DISCARD) - - { - packet_set_action(pkt, PACKET_ACTION_DROP); - } - - update_stat(sess, pkt); - session_set_current_packet(sess, NULL); - session_set_flow_type(sess, FLOW_TYPE_NONE); + continue; } if (packet_get_action(pkt) == PACKET_ACTION_DROP) { - if (pkt == defraged_pkt) - { - packet_io_drop(packet_io, thr_idx, &packets[i], 1); - packet_free(defraged_pkt); - } - else - { - packet_io_drop(packet_io, thr_idx, pkt, 1); - } + packet_io_drop(pkt_io, thread_id, pkt, 1); } - else // PACKET_ACTION_FORWARD + else { - if (pkt == defraged_pkt) - { - // TODO - // copy meta from defraged_pkt to packets[i] - packet_io_egress(packet_io, thr_idx, &packets[i], 1); - packet_free(defraged_pkt); - } - else - { - packet_io_egress(packet_io, thr_idx, pkt, 1); - } + packet_io_egress(pkt_io, thread_id, pkt, 1); } + + // TODO polling } idle_tasks: - clean_session(sess_mgr_rt, now_ms); - ip_reassembly_expire(ip_reass, now_ms); - // plugin_manager_on_polling(plug_mgr); - stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms); + // TODO polling - if (nr_pkt_received == 0) + if (nr_pkt_rcv == 0) { - packet_io_yield(packet_io, thr_idx); + packet_io_yield(pkt_io, thread_id); } } - // before exit wait all session close - while (sess_stat->tcp_sess_used > 0 || sess_stat->udp_sess_used > 0) - { - now_ms = clock_get_real_time_ms(); - clean_session(sess_mgr_rt, now_ms); - usleep(1000); // 1ms - } - - stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX); - stellar_stat_print(runtime->stat, &thr_stat, thread->idx); - mq_runtime_free(mq_rt); ATOMIC_SET(&thread->is_runing, 0); - CORE_LOG_FATAL("worker thread %d exit", thr_idx); + CORE_LOG_FATAL("worker thread %d exit", thread_id); return NULL; } -/****************************************************************************** - * Stellar Main Function - ******************************************************************************/ - -static int stellar_thread_init(struct stellar *st) +static int stellar_thread_run(struct stellar *st) { - struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; - - uint64_t now_ms = clock_get_real_time_ms(); - for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < st->thread_num; i++) { - struct stellar_thread *thread = &runtime->threads[i]; + struct stellar_thread *thread = &st->threads[i]; thread->idx = i; thread->is_runing = 0; thread->st = st; - - config->sess_mgr_cfg->session_id_seed = config->instance_id << 8 | i; - thread->sess_mgr_rt = session_manager_runtime_new(config->sess_mgr_cfg, now_ms); - if (thread->sess_mgr_rt == NULL) - { - CORE_LOG_ERROR("unable to create session manager"); - return -1; - } - - thread->ip_reass = ip_reassembly_new(config->ip_reass_cfg, now_ms); - if (thread->ip_reass == NULL) - { - CORE_LOG_ERROR("unable to create ip reassemble manager"); - return -1; - } - } - - return 0; -} - -static int stellar_thread_run(struct stellar *st) -{ - struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; - - for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) - { - struct stellar_thread *thread = &runtime->threads[i]; if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0) { CORE_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); @@ -360,31 +143,30 @@ static int stellar_thread_run(struct stellar *st) static void stellar_thread_join(struct stellar *st) { - struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; - CORE_LOG_FATAL("waiting worker thread exit"); - for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < st->thread_num; i++) { - struct stellar_thread *thread = &runtime->threads[i]; - pthread_join(thread->tid, NULL); + if (st->threads[i].is_runing == 0) + { + continue; + } - ip_reassembly_free(thread->ip_reass); - session_manager_runtime_free(thread->sess_mgr_rt); + struct stellar_thread *thread = &st->threads[i]; + pthread_join(thread->tid, NULL); } CORE_LOG_FATAL("all worker thread exited"); } -struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file) +struct stellar *stellar_new(const char *stellar_cfg_file, const char *module_cfg_file, const char *log_cfg_file) { if (stellar_cfg_file == NULL) { printf("stellar config file is null\n"); return NULL; } - if (plugin_cfg_file == NULL) + if (module_cfg_file == NULL) { - printf("plugin config file is null\n"); + printf("module config file is null\n"); return NULL; } if (log_cfg_file == NULL) @@ -398,91 +180,55 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg { return NULL; } - st->magic[0] = 0x4d; - st->magic[1] = 0x5a; - struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; - runtime->logger = log_new(log_cfg_file); - if (runtime->logger == NULL) + st->logger = log_new(log_cfg_file); + if (st->logger == NULL) { printf("unable to create logger"); goto error_out; } - __thread_local_logger = runtime->logger; - CORE_LOG_FATAL("stellar start (version: %s)\n %s", version, logo_str); - CORE_LOG_FATAL("stellar config file: %s, plugin config file: %s, log config file: %s", stellar_cfg_file, plugin_cfg_file, log_cfg_file); - if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&config->instance_id, 0, 4095) != 0) + __thread_local_logger = st->logger; + CORE_LOG_FATAL("stellar start (version: %s)", version); + + if (load_and_validate_toml_integer_config(stellar_cfg_file, "packet_io.nr_worker_thread", (uint64_t *)&st->thread_num, 1, MAX_THREAD_NUM) != 0) { - CORE_LOG_ERROR("unable to load instance id"); + CORE_LOG_ERROR("unable to get thread number from config file"); goto error_out; } - config->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file); - if (config->sess_mgr_cfg == NULL) + st->mq_schema = mq_schema_new(); + if (st->mq_schema == NULL) { - CORE_LOG_ERROR("unable to create session manager config"); - goto error_out; - } - config->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file); - if (config->ip_reass_cfg == NULL) - { - CORE_LOG_ERROR("unable to create ip reassembly config"); - goto error_out; - } - config->pkt_io_cfg = packet_io_config_new(stellar_cfg_file); - if (config->pkt_io_cfg == NULL) - { - CORE_LOG_ERROR("unable to create packet io config"); - goto error_out; - } - config->stat_cfg = stellar_stat_config_new(stellar_cfg_file); - if (config->stat_cfg == NULL) - { - CORE_LOG_ERROR("unable to create stellar stat config"); - goto error_out; - } - stellar_stat_config_print(config->stat_cfg); - packet_io_config_print(config->pkt_io_cfg); - ip_reassembly_config_print(config->ip_reass_cfg); - session_manager_config_print(config->sess_mgr_cfg); - - runtime->stat = stellar_stat_new(config->stat_cfg, clock_get_real_time_ms()); - if (runtime->stat == NULL) - { - CORE_LOG_ERROR("unable to create stellar stat"); - goto error_out; - } - runtime->mq_schema = mq_schema_new(); - runtime->mod_mgr = stellar_module_manager_new(plugin_cfg_file, config->pkt_io_cfg->nr_worker_thread, runtime->mq_schema); - if (runtime->mod_mgr == NULL) - { - CORE_LOG_ERROR("unable to create plugin manager"); + CORE_LOG_ERROR("unable to create mq schema"); goto error_out; } - runtime->packet_io = packet_io_new(config->pkt_io_cfg); - if (runtime->packet_io == NULL) + st->pkt_mgr = packet_manager_new(st->mq_schema, stellar_cfg_file); + if (st->pkt_mgr == NULL) + { + CORE_LOG_ERROR("unable to create packet manager"); + goto error_out; + } + + st->mod_mgr = stellar_module_manager_new(module_cfg_file, st->thread_num, st->mq_schema); + if (st->mod_mgr == NULL) + { + CORE_LOG_ERROR("unable to create module manager"); + goto error_out; + } + + st->pkt_io = packet_io_new(stellar_cfg_file); + if (st->pkt_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); goto error_out; } - if (stellar_thread_init(st) != 0) - { - CORE_LOG_ERROR("unable to init thread context"); - goto error_out; - } - return st; error_out: - if (st == NULL) - { - stellar_free(st); - st = NULL; - } + stellar_free(st); return NULL; } @@ -494,30 +240,20 @@ void stellar_run(struct stellar *st) return; } - struct stellar_runtime *runtime = &st->runtime; - if (stellar_thread_run(st) != 0) { CORE_LOG_ERROR("unable to create worker thread"); return; } - while (!ATOMIC_READ(&runtime->need_exit)) + while (!ATOMIC_READ(&st->need_exit)) { - if (st->magic[0] != 0x4d || st->magic[1] != 0x5a) - { - CORE_LOG_FATAL("memory corruption detected"); - ATOMIC_SET(&runtime->need_exit, 1); - break; - } - - stellar_stat_output(runtime->stat, clock_get_real_time_ms()); usleep(1000); // 1ms // only available in pcap mode - if (packet_io_isbreak(runtime->packet_io)) + if (packet_io_isbreak(st->pkt_io)) { - ATOMIC_SET(&runtime->need_exit, 1); + ATOMIC_SET(&st->need_exit, 1); CORE_LOG_FATAL("notify worker thread to exit"); break; } @@ -530,21 +266,15 @@ void stellar_free(struct stellar *st) { if (st) { - struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; + stellar_thread_join(st); - packet_io_free(runtime->packet_io); - stellar_module_manager_free(runtime->mod_mgr); - mq_schema_free(runtime->mq_schema); - stellar_stat_free(runtime->stat); - - session_manager_config_free(config->sess_mgr_cfg); - ip_reassembly_config_free(config->ip_reass_cfg); - packet_io_config_free(config->pkt_io_cfg); - stellar_stat_config_free(config->stat_cfg); + packet_io_free(st->pkt_io); + stellar_module_manager_free(st->mod_mgr); + packet_manager_free(st->pkt_mgr); + mq_schema_free(st->mq_schema); CORE_LOG_FATAL("stellar exit\n"); - log_free(runtime->logger); + log_free(st->logger); free(st); st = NULL; @@ -555,8 +285,7 @@ void stellar_loopbreak(struct stellar *st) { if (st) { - struct stellar_runtime *runtime = &st->runtime; - ATOMIC_SET(&runtime->need_exit, 1); + ATOMIC_SET(&st->need_exit, 1); } } @@ -564,7 +293,7 @@ void stellar_reload_log_level(struct stellar *st) { if (st) { - log_reload_level(st->runtime.logger); + log_reload_level(st->logger); } } @@ -572,50 +301,41 @@ void stellar_reload_log_level(struct stellar *st) * Stellar Utility Function ******************************************************************************/ +// TODO +#if 0 // only send user build packet, can't send packet which come from network void stellar_send_build_packet(struct stellar *st, struct packet *pkt) { - uint16_t thr_idx = stellar_module_manager_get_thread_id(st->runtime.mod_mgr); - struct packet_io *packet_io = st->runtime.packet_io; - struct session_manager_runtime *sess_mgr_rt = st->runtime.threads[thr_idx].sess_mgr_rt; + uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr); + struct packet_io *pkt_io = st->st.pkt_io; + struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt; session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt); + if (packet_is_claim(pkt)) + { + PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order"); + assert(0); + return; + } + if (packet_get_origin_ctx(pkt)) { // TODO abort(); - packet_io_egress(packet_io, thr_idx, pkt, 1); + packet_io_egress(pkt_io, thread_id, pkt, 1); } else { - packet_io_inject(packet_io, thr_idx, pkt, 1); - } -} - -int stellar_get_worker_thread_num(struct stellar *st) -{ - return st->config.pkt_io_cfg->nr_worker_thread; -} - -uint16_t stellar_get_current_thread_index() -{ - if (__current_thread_idx == UINT16_MAX) - { - printf("get current thread index before set\n"); - abort(); - return UINT16_MAX; - } - else - { - return __current_thread_idx; + packet_io_inject(pkt_io, thread_id, pkt, 1); } } +#endif struct logger *stellar_get_logger(struct stellar *st) { if (st) { - return st->runtime.logger; + return st->logger; } else { diff --git a/infra/version.map b/infra/version.map index f241d50..8a87296 100644 --- a/infra/version.map +++ b/infra/version.map @@ -42,8 +42,6 @@ global: stellar_session_plugin_dettach_current_session; stellar_packet_plugin_register; stellar_polling_plugin_register; - stellar_get_current_thread_index; - stellar_get_worker_thread_num; stellar_send_build_packet; stellar_new; stellar_run;