From beb7d2f0ca4f62d96cbe2295bb2bc482f0492d02 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Mon, 2 Sep 2024 16:03:08 +0800 Subject: [PATCH] stellar stat parses output related configuration items --- conf/stellar.toml | 6 +- infra/core/CMakeLists.txt | 2 +- infra/core/stellar_config.c | 109 --------- infra/core/stellar_config.h | 24 -- infra/core/stellar_core.c | 116 +++++---- infra/core/stellar_stat.c | 230 +++++++++++------- infra/core/stellar_stat.h | 23 +- .../test_based_on_stellar/env/stellar.toml | 6 +- test/packet_inject/conf/stellar.toml | 6 +- 9 files changed, 228 insertions(+), 294 deletions(-) delete mode 100644 infra/core/stellar_config.c delete mode 100644 infra/core/stellar_config.h diff --git a/conf/stellar.toml b/conf/stellar.toml index 986c18b..f5d4040 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -59,6 +59,6 @@ timeout_ms = 10000 # range: [1, 60000] (ms) buffered_segments_max = 256 # range: [2, 4096] per flow -[schedule] - merge_stat_interval = 500 # range: [1, 60000] (ms) - output_stat_interval = 2000 # range: [1, 60000] (ms) +[stat] + merge_interval_ms = 500 # range: [0, 60000] (ms) + output_interval_ms = 1000 # range: [0, 60000] (ms) \ No newline at end of file diff --git a/infra/core/CMakeLists.txt b/infra/core/CMakeLists.txt index ea74acc..89b5b0f 100644 --- a/infra/core/CMakeLists.txt +++ b/infra/core/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(core stellar_config.c stellar_stat.c stellar_core.c) +add_library(core stellar_stat.c stellar_core.c) target_link_libraries(core PUBLIC packet_io ip_reassembly plugin_manager) diff --git a/infra/core/stellar_config.c b/infra/core/stellar_config.c deleted file mode 100644 index df9cb52..0000000 --- a/infra/core/stellar_config.c +++ /dev/null @@ -1,109 +0,0 @@ -#include -#include -#include - -#include "toml.h" -#include "log_private.h" -#include "stellar_config.h" - -#define CONFIG_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "config", format, ##__VA_ARGS__) -#define CONFIG_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "config", format, ##__VA_ARGS__) - -// return 0: success -// retuun -1: failed -static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts) -{ - const char *ptr; - toml_table_t *table; - - table = toml_table_in(root, "schedule"); - if (table == NULL) - { - CONFIG_LOG_ERROR("config file missing schedule section"); - return -1; - } - - ptr = toml_raw_in(table, "merge_stat_interval"); - if (ptr == NULL) - { - CONFIG_LOG_ERROR("config file missing schedule->merge_stat_interval"); - return -1; - } - opts->merge_stat_interval = atoll(ptr); - if (opts->merge_stat_interval < 1 || opts->merge_stat_interval > 60000) - { - CONFIG_LOG_ERROR("config file invalid schedule->merge_stat_interval %ld, range [1, 60000]", opts->merge_stat_interval); - return -1; - } - - ptr = toml_raw_in(table, "output_stat_interval"); - if (ptr == NULL) - { - CONFIG_LOG_ERROR("config file missing schedule->output_stat_interval"); - return -1; - } - opts->output_stat_interval = atoll(ptr); - if (opts->output_stat_interval < 1 || opts->output_stat_interval > 60000) - { - CONFIG_LOG_ERROR("config file invalid schedule->output_stat_interval %ld, range [1, 60000]", opts->output_stat_interval); - return -1; - } - - return 0; -} - -// return 0: success -// retuun -1: failed -int stellar_config_load(struct stellar_config *config, const char *file) -{ - int ret = -1; - char errbuf[200]; - FILE *fp = NULL; - toml_table_t *table = NULL; - - fp = fopen(file, "r"); - if (fp == NULL) - { - CONFIG_LOG_ERROR("open config file %s failed, %s", file, strerror(errno)); - goto error_out; - } - - table = toml_parse_file(fp, errbuf, sizeof(errbuf)); - if (table == NULL) - { - CONFIG_LOG_ERROR("parse config file %s failed, %s", file, errbuf); - goto error_out; - } - - if (parse_schedule_options(table, &config->sched_opts) != 0) - { - goto error_out; - } - - ret = 0; - -error_out: - if (table) - { - toml_free(table); - } - - if (fp) - { - fclose(fp); - } - - return ret; -} - -void stellar_config_print(const struct stellar_config *config) -{ - if (config == NULL) - { - return; - } - - // schedule config - CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval); - CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval); -} diff --git a/infra/core/stellar_config.h b/infra/core/stellar_config.h deleted file mode 100644 index 8cb5f07..0000000 --- a/infra/core/stellar_config.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -struct schedule_options -{ - uint64_t merge_stat_interval; // range: [1, 60000] (ms) - uint64_t output_stat_interval; // range: [1, 60000] (ms) -}; - -struct stellar_config -{ - struct schedule_options sched_opts; -}; - -int stellar_config_load(struct stellar_config *config, const char *file); -void stellar_config_print(const struct stellar_config *config); - -#ifdef __cplusplus -} -#endif diff --git a/infra/core/stellar_core.c b/infra/core/stellar_core.c index bcd8c96..197225a 100644 --- a/infra/core/stellar_core.c +++ b/infra/core/stellar_core.c @@ -13,7 +13,6 @@ #include "log_private.h" #include "stellar_stat.h" #include "stellar_core.h" -#include "stellar_config.h" #include "packet_private.h" #include "plugin_manager.h" #include "session_private.h" @@ -41,7 +40,6 @@ struct stellar_thread pthread_t tid; uint16_t idx; uint64_t is_runing; - uint64_t last_merge_thread_stat_timestamp; struct ip_reassembly *ip_reass; struct session_manager *sess_mgr; struct stellar *st; @@ -50,24 +48,27 @@ struct stellar_thread struct stellar_runtime { uint64_t need_exit; - uint64_t stat_last_output_ts; struct logger *logger; struct stellar_stat *stat; struct packet_io *packet_io; struct plugin_manager_schema *plug_mgr; struct stellar_thread threads[MAX_THREAD_NUM]; +}; - // config +struct stellar_config +{ uint64_t instance_id; struct session_manager_config *sess_mgr_cfg; struct ip_reassembly_config *ip_reass_cfg; struct packet_io_config *pkt_io_cfg; + struct stellar_stat_config *stat_cfg; }; struct stellar { - struct stellar_runtime runtime; + char magic[2]; // for check memory corruption struct stellar_config config; + struct stellar_runtime runtime; }; static __thread uint16_t __current_thread_idx = UINT16_MAX; @@ -131,17 +132,14 @@ static void *worker_thread(void *arg) struct session_manager *sess_mgr = thread->sess_mgr; struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr); struct stellar *st = thread->st; - struct stellar_config *config = &st->config; struct stellar_runtime *runtime = &st->runtime; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; struct thread_stat thr_stat = { - .packet_io = packet_io_stat(packet_io, thread->idx), - .ip_reassembly = ip_reassembly_stat(ip_reass), - .session_mgr = session_manager_stat(sess_mgr), + .pkt_io = packet_io_stat(packet_io, thread->idx), + .ip_reass = ip_reassembly_stat(ip_reass), + .sess_mgr = session_manager_stat(sess_mgr), }; - - uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval; uint16_t thr_idx = thread->idx; __current_thread_idx = thr_idx; @@ -222,10 +220,6 @@ static void *worker_thread(void *arg) goto fast_path; } } - if (packet_get_session_id(pkt) == 0) - { - packet_set_session_id(pkt, session_get_id(sess)); - } plugin_manager_on_session_input(sess, pkt); fast_path: @@ -285,13 +279,7 @@ static void *worker_thread(void *arg) clean_session(sess_mgr, now_ms); ip_reassembly_expire(ip_reass, now_ms); plugin_manager_on_polling(plug_mgr); - - // per merge_stat_interval merge thread stat - if (now_ms - thread->last_merge_thread_stat_timestamp >= merge_stat_interval) - { - stellar_stat_merge(runtime->stat, &thr_stat, thread->idx); - thread->last_merge_thread_stat_timestamp = now_ms; - } + stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms); if (nr_pkt_received == 0) { @@ -307,7 +295,7 @@ static void *worker_thread(void *arg) usleep(1000); // 1ms } - stellar_stat_merge(runtime->stat, &thr_stat, thread->idx); + stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX); stellar_stat_print(runtime->stat, &thr_stat, thread->idx); ATOMIC_SET(&thread->is_runing, 0); @@ -323,25 +311,24 @@ static void *worker_thread(void *arg) static int stellar_thread_init(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; + struct stellar_config *config = &st->config; uint64_t now_ms = clock_get_real_time_ms(); - for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; thread->idx = i; thread->is_runing = 0; - thread->last_merge_thread_stat_timestamp = now_ms; - - runtime->sess_mgr_cfg->session_id_seed = runtime->instance_id << 8 | i; - thread->sess_mgr = session_manager_new(runtime->sess_mgr_cfg, now_ms); + config->sess_mgr_cfg->session_id_seed = config->instance_id << 8 | i; + thread->sess_mgr = session_manager_new(config->sess_mgr_cfg, now_ms); if (thread->sess_mgr == NULL) { CORE_LOG_ERROR("unable to create session manager"); return -1; } - thread->ip_reass = ip_reassembly_new(runtime->ip_reass_cfg, now_ms); + thread->ip_reass = ip_reassembly_new(config->ip_reass_cfg, now_ms); if (thread->ip_reass == NULL) { CORE_LOG_ERROR("unable to create ip reassemble manager"); @@ -356,9 +343,10 @@ static int stellar_thread_init(struct stellar *st) static void stellar_thread_clean(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; + struct stellar_config *config = &st->config; CORE_LOG_FATAL("cleaning worker thread context ..."); - for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (ATOMIC_READ(&thread->is_runing) == 0) @@ -373,8 +361,9 @@ static void stellar_thread_clean(struct stellar *st) static int stellar_thread_run(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; + struct stellar_config *config = &st->config; - for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0) @@ -390,9 +379,10 @@ static int stellar_thread_run(struct stellar *st) static void stellar_thread_join(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; + struct stellar_config *config = &st->config; CORE_LOG_FATAL("waiting worker thread stop ..."); - for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) + for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; while (ATOMIC_READ(&thread->is_runing) == 1) @@ -426,7 +416,8 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg { return NULL; } - + st->magic[0] = 0x4d; + st->magic[1] = 0x5a; struct stellar_runtime *runtime = &st->runtime; struct stellar_config *config = &st->config; @@ -442,42 +433,43 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg CORE_LOG_FATAL("plugin config file : %s", plugin_cfg_file); CORE_LOG_FATAL("log config file : %s", log_cfg_file); - if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&runtime->instance_id, 0, 4095) != 0) + if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&config->instance_id, 0, 4095) != 0) { CORE_LOG_ERROR("unable to load instance id"); goto error_out; } - runtime->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file); - if (runtime->sess_mgr_cfg == NULL) + config->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file); + if (config->sess_mgr_cfg == NULL) { CORE_LOG_ERROR("unable to create session manager config"); goto error_out; } - runtime->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file); - if (runtime->ip_reass_cfg == NULL) + config->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file); + if (config->ip_reass_cfg == NULL) { CORE_LOG_ERROR("unable to create ip reassembly config"); goto error_out; } - runtime->pkt_io_cfg = packet_io_config_new(stellar_cfg_file); - if (runtime->pkt_io_cfg == NULL) + config->pkt_io_cfg = packet_io_config_new(stellar_cfg_file); + if (config->pkt_io_cfg == NULL) { CORE_LOG_ERROR("unable to create packet io config"); goto error_out; } - - packet_io_config_print(runtime->pkt_io_cfg); - session_manager_config_print(runtime->sess_mgr_cfg); - ip_reassembly_config_print(runtime->ip_reass_cfg); - if (stellar_config_load(config, stellar_cfg_file) != 0) + config->stat_cfg = stellar_stat_config_new(stellar_cfg_file); + if (config->stat_cfg == NULL) { - CORE_LOG_ERROR("unable to load config file"); + CORE_LOG_ERROR("unable to create stellar stat config"); goto error_out; } - stellar_config_print(config); - runtime->stat = stellar_stat_new(runtime->pkt_io_cfg->nr_worker_thread); + session_manager_config_print(config->sess_mgr_cfg); + ip_reassembly_config_print(config->ip_reass_cfg); + packet_io_config_print(config->pkt_io_cfg); + stellar_stat_config_print(config->stat_cfg); + + runtime->stat = stellar_stat_new(config->stat_cfg, clock_get_real_time_ms()); if (runtime->stat == NULL) { CORE_LOG_ERROR("unable to create stellar stat"); @@ -490,7 +482,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg goto error_out; } - runtime->packet_io = packet_io_new(runtime->pkt_io_cfg); + runtime->packet_io = packet_io_new(config->pkt_io_cfg); if (runtime->packet_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); @@ -523,7 +515,6 @@ void stellar_run(struct stellar *st) } struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; if (stellar_thread_run(st) != 0) { @@ -531,14 +522,16 @@ void stellar_run(struct stellar *st) return; } - runtime->stat_last_output_ts = clock_get_real_time_ms(); while (!ATOMIC_READ(&runtime->need_exit)) { - if (clock_get_real_time_ms() - runtime->stat_last_output_ts >= config->sched_opts.output_stat_interval) + if (st->magic[0] != 0x4d || st->magic[1] != 0x5a) { - runtime->stat_last_output_ts = clock_get_real_time_ms(); - stellar_stat_output(runtime->stat); + CORE_LOG_FATAL("memory corruption detected"); + ATOMIC_SET(&runtime->need_exit, 1); + break; } + + stellar_stat_output(runtime->stat, clock_get_real_time_ms()); usleep(1000); // 1ms // only available in pcap mode @@ -546,13 +539,12 @@ void stellar_run(struct stellar *st) { ATOMIC_SET(&runtime->need_exit, 1); CORE_LOG_FATAL("notify worker thread to exit"); - stellar_stat_output(runtime->stat); // flush stat break; } } stellar_thread_join(st); - stellar_stat_output(runtime->stat); + stellar_stat_output(runtime->stat, UINT64_MAX); } void stellar_free(struct stellar *st) @@ -560,14 +552,18 @@ void stellar_free(struct stellar *st) if (st) { struct stellar_runtime *runtime = &st->runtime; + struct stellar_config *config = &st->config; stellar_thread_clean(st); packet_io_free(runtime->packet_io); plugin_manager_exit(runtime->plug_mgr); stellar_stat_free(runtime->stat); - packet_io_config_free(runtime->pkt_io_cfg); - ip_reassembly_config_free(runtime->ip_reass_cfg); - session_manager_config_free(runtime->sess_mgr_cfg); + + session_manager_config_free(config->sess_mgr_cfg); + ip_reassembly_config_free(config->ip_reass_cfg); + packet_io_config_free(config->pkt_io_cfg); + stellar_stat_config_free(config->stat_cfg); + CORE_LOG_FATAL("stellar exit\n"); log_free(runtime->logger); @@ -654,7 +650,7 @@ void stellar_send_build_packet(struct stellar *st, struct packet *pkt) int stellar_get_worker_thread_num(struct stellar *st) { - return st->runtime.pkt_io_cfg->nr_worker_thread; + return st->config.pkt_io_cfg->nr_worker_thread; } struct logger *stellar_get_logger(struct stellar *st) diff --git a/infra/core/stellar_stat.c b/infra/core/stellar_stat.c index 6171ecb..408929f 100644 --- a/infra/core/stellar_stat.c +++ b/infra/core/stellar_stat.c @@ -237,13 +237,14 @@ const char *name[] = { struct stellar_stat { - uint16_t nr_thread; - char output_file[2048]; + struct stellar_stat_config cfg; struct fieldstat_easy *fs; + uint64_t last_merge_stat_ts; + uint64_t last_output_stat_ts; + int flag[MAX_THREAD_NUM]; // IS_FREE or IS_BUSY struct thread_stat thr_stat[MAX_THREAD_NUM]; - uint64_t stat_idx[STAT_TYPE_MAX]; uint64_t stat_val[STAT_TYPE_MAX]; }; @@ -254,177 +255,177 @@ uint64_t get_stat_value_by_idx(const struct thread_stat *thr_stat, size_t idx) { // device packet case STAT_TYPE_PKTS_RX: - return thr_stat->packet_io->pkts_rx; + return thr_stat->pkt_io->pkts_rx; case STAT_TYPE_BYTES_RX: - return thr_stat->packet_io->bytes_rx; + return thr_stat->pkt_io->bytes_rx; case STAT_TYPE_PKTS_TX: - return thr_stat->packet_io->pkts_tx; + return thr_stat->pkt_io->pkts_tx; case STAT_TYPE_BYTES_TX: - return thr_stat->packet_io->bytes_tx; + return thr_stat->pkt_io->bytes_tx; // keep-alive packet case STAT_TYPE_KEEP_ALIVE_PKTS: - return thr_stat->packet_io->keep_alive_pkts; + return thr_stat->pkt_io->keep_alive_pkts; case STAT_TYPE_KEEP_ALIVE_BYTES: - return thr_stat->packet_io->keep_alive_bytes; + return thr_stat->pkt_io->keep_alive_bytes; // raw packet case STAT_TYPE_RAW_PKTS_RX: - return thr_stat->packet_io->raw_pkts_rx; + return thr_stat->pkt_io->raw_pkts_rx; case STAT_TYPE_RAW_BYTES_RX: - return thr_stat->packet_io->raw_bytes_rx; + return thr_stat->pkt_io->raw_bytes_rx; case STAT_TYPE_RAW_PKTS_TX: - return thr_stat->packet_io->raw_pkts_tx; + return thr_stat->pkt_io->raw_pkts_tx; case STAT_TYPE_RAW_BYTES_TX: - return thr_stat->packet_io->raw_bytes_tx; + return thr_stat->pkt_io->raw_bytes_tx; // drop packet case STAT_TYPE_PKTS_DROPPED: - return thr_stat->packet_io->pkts_dropped; + return thr_stat->pkt_io->pkts_dropped; case STAT_TYPE_BYTES_DROPPED: - return thr_stat->packet_io->bytes_dropped; + return thr_stat->pkt_io->bytes_dropped; // inject packet case STAT_TYPE_PKTS_INJECTED: - return thr_stat->packet_io->pkts_injected; + return thr_stat->pkt_io->pkts_injected; case STAT_TYPE_BYTES_INJECTED: - return thr_stat->packet_io->bytes_injected; + return thr_stat->pkt_io->bytes_injected; // ctrl packet case STAT_TYPE_CTRL_PKTS_RX: - return thr_stat->packet_io->ctrl_pkts_rx; + return thr_stat->pkt_io->ctrl_pkts_rx; case STAT_TYPE_CTRL_BYTES_RX: - return thr_stat->packet_io->ctrl_bytes_rx; + return thr_stat->pkt_io->ctrl_bytes_rx; case STAT_TYPE_CTRL_PKTS_TX: - return thr_stat->packet_io->ctrl_pkts_tx; + return thr_stat->pkt_io->ctrl_pkts_tx; case STAT_TYPE_CTRL_BYTES_TX: - return thr_stat->packet_io->ctrl_bytes_tx; + return thr_stat->pkt_io->ctrl_bytes_tx; // ipv4 reassembly case STAT_TYPE_IP4_DEFRAGS_EXPECTED: - return thr_stat->ip_reassembly->ip4_defrags_expected; + return thr_stat->ip_reass->ip4_defrags_expected; case STAT_TYPE_IP4_DEFRAGS_SUCCEED: - return thr_stat->ip_reassembly->ip4_defrags_succeed; + return thr_stat->ip_reass->ip4_defrags_succeed; case STAT_TYPE_IP4_DEFRAGS_FAILED_TIMEOUT: - return thr_stat->ip_reassembly->ip4_defrags_failed_timeout; + return thr_stat->ip_reass->ip4_defrags_failed_timeout; case STAT_TYPE_IP4_DEFRAGS_FAILED_INVALID_LENGTH: - return thr_stat->ip_reassembly->ip4_defrags_failed_invalid_length; + return thr_stat->ip_reass->ip4_defrags_failed_invalid_length; case STAT_TYPE_IP4_DEFRAGS_FAILED_OVERLAP: - return thr_stat->ip_reassembly->ip4_defrags_failed_overlap; + return thr_stat->ip_reass->ip4_defrags_failed_overlap; case STAT_TYPE_IP4_DEFRAGS_FAILED_TOO_MANY_FRAG: - return thr_stat->ip_reassembly->ip4_defrags_failed_too_many_frag; + return thr_stat->ip_reass->ip4_defrags_failed_too_many_frag; case STAT_TYPE_IP4_FRAGS: - return thr_stat->ip_reassembly->ip4_frags; + return thr_stat->ip_reass->ip4_frags; case STAT_TYPE_IP4_FRAGS_FREED: - return thr_stat->ip_reassembly->ip4_frags_freed; + return thr_stat->ip_reass->ip4_frags_freed; case STAT_TYPE_IP4_FRAGS_BUFFERED: - return thr_stat->ip_reassembly->ip4_frags_buffered; + return thr_stat->ip_reass->ip4_frags_buffered; case STAT_TYPE_IP4_FRAGS_BYPASS_NO_BUFFER: - return thr_stat->ip_reassembly->ip4_frags_bypass_no_buffer; + return thr_stat->ip_reass->ip4_frags_bypass_no_buffer; case STAT_TYPE_IP4_FRAGS_BYPASS_DUP_FIST_FRAG: - return thr_stat->ip_reassembly->ip4_frags_bypass_dup_fist_frag; + return thr_stat->ip_reass->ip4_frags_bypass_dup_fist_frag; case STAT_TYPE_IP4_FRAGS_BYPASS_DUP_LAST_FRAG: - return thr_stat->ip_reassembly->ip4_frags_bypass_dup_last_frag; + return thr_stat->ip_reass->ip4_frags_bypass_dup_last_frag; // ipv6 reassembly case STAT_TYPE_IP6_DEFRAGS_EXPECTED: - return thr_stat->ip_reassembly->ip6_defrags_expected; + return thr_stat->ip_reass->ip6_defrags_expected; case STAT_TYPE_IP6_DEFRAGS_SUCCEED: - return thr_stat->ip_reassembly->ip6_defrags_succeed; + return thr_stat->ip_reass->ip6_defrags_succeed; case STAT_TYPE_IP6_DEFRAGS_FAILED_TIMEOUT: - return thr_stat->ip_reassembly->ip6_defrags_failed_timeout; + return thr_stat->ip_reass->ip6_defrags_failed_timeout; case STAT_TYPE_IP6_DEFRAGS_FAILED_INVALID_LENGTH: - return thr_stat->ip_reassembly->ip6_defrags_failed_invalid_length; + return thr_stat->ip_reass->ip6_defrags_failed_invalid_length; case STAT_TYPE_IP6_DEFRAGS_FAILED_OVERLAP: - return thr_stat->ip_reassembly->ip6_defrags_failed_overlap; + return thr_stat->ip_reass->ip6_defrags_failed_overlap; case STAT_TYPE_IP6_DEFRAGS_FAILED_TOO_MANY_FRAG: - return thr_stat->ip_reassembly->ip6_defrags_failed_too_many_frag; + return thr_stat->ip_reass->ip6_defrags_failed_too_many_frag; case STAT_TYPE_IP6_FRAGS: - return thr_stat->ip_reassembly->ip6_frags; + return thr_stat->ip_reass->ip6_frags; case STAT_TYPE_IP6_FRAGS_FREED: - return thr_stat->ip_reassembly->ip6_frags_freed; + return thr_stat->ip_reass->ip6_frags_freed; case STAT_TYPE_IP6_FRAGS_BUFFERED: - return thr_stat->ip_reassembly->ip6_frags_buffered; + return thr_stat->ip_reass->ip6_frags_buffered; case STAT_TYPE_IP6_FRAGS_BYPASS_NO_BUFFER: - return thr_stat->ip_reassembly->ip6_frags_bypass_no_buffer; + return thr_stat->ip_reass->ip6_frags_bypass_no_buffer; case STAT_TYPE_IP6_FRAGS_BYPASS_DUP_FIST_FRAG: - return thr_stat->ip_reassembly->ip6_frags_bypass_dup_fist_frag; + return thr_stat->ip_reass->ip6_frags_bypass_dup_fist_frag; case STAT_TYPE_IP6_FRAGS_BYPASS_DUP_LAST_FRAG: - return thr_stat->ip_reassembly->ip6_frags_bypass_dup_last_frag; + return thr_stat->ip_reass->ip6_frags_bypass_dup_last_frag; // TCP session case STAT_TYPE_HISTORY_TCP_SESSIONS: - return thr_stat->session_mgr->history_tcp_sessions; + return thr_stat->sess_mgr->history_tcp_sessions; case STAT_TYPE_TCP_SESS_USED: - return thr_stat->session_mgr->tcp_sess_used; + return thr_stat->sess_mgr->tcp_sess_used; case STAT_TYPE_TCP_SESS_OPENING: - return thr_stat->session_mgr->tcp_sess_opening; + return thr_stat->sess_mgr->tcp_sess_opening; case STAT_TYPE_TCP_SESS_ACTIVE: - return thr_stat->session_mgr->tcp_sess_active; + return thr_stat->sess_mgr->tcp_sess_active; case STAT_TYPE_TCP_SESS_CLOSING: - return thr_stat->session_mgr->tcp_sess_closing; + return thr_stat->sess_mgr->tcp_sess_closing; case STAT_TYPE_TCP_SESS_DISCARD: - return thr_stat->session_mgr->tcp_sess_discard; + return thr_stat->sess_mgr->tcp_sess_discard; case STAT_TYPE_TCP_SESS_CLOSED: - return thr_stat->session_mgr->tcp_sess_closed; + return thr_stat->sess_mgr->tcp_sess_closed; // UDP session case STAT_TYPE_HISTORY_UDP_SESSIONS: - return thr_stat->session_mgr->history_udp_sessions; + return thr_stat->sess_mgr->history_udp_sessions; case STAT_TYPE_UDP_SESS_USED: - return thr_stat->session_mgr->udp_sess_used; + return thr_stat->sess_mgr->udp_sess_used; case STAT_TYPE_UDP_SESS_OPENING: - return thr_stat->session_mgr->udp_sess_opening; + return thr_stat->sess_mgr->udp_sess_opening; case STAT_TYPE_UDP_SESS_ACTIVE: - return thr_stat->session_mgr->udp_sess_active; + return thr_stat->sess_mgr->udp_sess_active; case STAT_TYPE_UDP_SESS_CLOSING: - return thr_stat->session_mgr->udp_sess_closing; + return thr_stat->sess_mgr->udp_sess_closing; case STAT_TYPE_UDP_SESS_DISCARD: - return thr_stat->session_mgr->udp_sess_discard; + return thr_stat->sess_mgr->udp_sess_discard; case STAT_TYPE_UDP_SESS_CLOSED: - return thr_stat->session_mgr->udp_sess_closed; + return thr_stat->sess_mgr->udp_sess_closed; // Evicted session case STAT_TYPE_TCP_SESS_EVICTED: - return thr_stat->session_mgr->tcp_sess_evicted; + return thr_stat->sess_mgr->tcp_sess_evicted; case STAT_TYPE_UDP_SESS_EVICTED: - return thr_stat->session_mgr->udp_sess_evicted; + return thr_stat->sess_mgr->udp_sess_evicted; // Packet case STAT_TYPE_UDP_PKTS_BYPASS_TABLE_FULL: - return thr_stat->session_mgr->udp_pkts_bypass_table_full; + return thr_stat->sess_mgr->udp_pkts_bypass_table_full; case STAT_TYPE_TCP_PKTS_BYPASS_TABLE_FULL: - return thr_stat->session_mgr->tcp_pkts_bypass_table_full; + return thr_stat->sess_mgr->tcp_pkts_bypass_table_full; case STAT_TYPE_TCP_PKTS_BYPASS_SESSION_NOT_FOUND: - return thr_stat->session_mgr->tcp_pkts_bypass_session_not_found; + return thr_stat->sess_mgr->tcp_pkts_bypass_session_not_found; case STAT_TYPE_TCP_PKTS_BYPASS_DUPLICATED: - return thr_stat->session_mgr->tcp_pkts_bypass_duplicated; + return thr_stat->sess_mgr->tcp_pkts_bypass_duplicated; case STAT_TYPE_UDP_PKTS_BYPASS_DUPLICATED: - return thr_stat->session_mgr->udp_pkts_bypass_duplicated; + return thr_stat->sess_mgr->udp_pkts_bypass_duplicated; case STAT_TYPE_UDP_PKTS_BYPASS_SESSION_EVICTED: - return thr_stat->session_mgr->udp_pkts_bypass_session_evicted; + return thr_stat->sess_mgr->udp_pkts_bypass_session_evicted; // TCP segments case STAT_TYPE_TCP_SEGS_INPUT: - return thr_stat->session_mgr->tcp_segs_input; + return thr_stat->sess_mgr->tcp_segs_input; case STAT_TYPE_TCP_SEGS_CONSUMED: - return thr_stat->session_mgr->tcp_segs_consumed; + return thr_stat->sess_mgr->tcp_segs_consumed; case STAT_TYPE_TCP_SEGS_TIMEOUT: - return thr_stat->session_mgr->tcp_segs_timeout; + return thr_stat->sess_mgr->tcp_segs_timeout; case STAT_TYPE_TCP_SEGS_RETRANSMITED: - return thr_stat->session_mgr->tcp_segs_retransmited; + return thr_stat->sess_mgr->tcp_segs_retransmited; case STAT_TYPE_TCP_SEGS_OVERLAPPED: - return thr_stat->session_mgr->tcp_segs_overlapped; + return thr_stat->sess_mgr->tcp_segs_overlapped; case STAT_TYPE_TCP_SEGS_OMITTED_TOO_MANY: - return thr_stat->session_mgr->tcp_segs_omitted_too_many; + return thr_stat->sess_mgr->tcp_segs_omitted_too_many; case STAT_TYPE_TCP_SEGS_INORDER: - return thr_stat->session_mgr->tcp_segs_inorder; + return thr_stat->sess_mgr->tcp_segs_inorder; case STAT_TYPE_TCP_SEGS_REORDERED: - return thr_stat->session_mgr->tcp_segs_reordered; + return thr_stat->sess_mgr->tcp_segs_reordered; case STAT_TYPE_TCP_SEGS_BUFFERED: - return thr_stat->session_mgr->tcp_segs_buffered; + return thr_stat->sess_mgr->tcp_segs_buffered; case STAT_TYPE_TCP_SEGS_FREED: - return thr_stat->session_mgr->tcp_segs_freed; + return thr_stat->sess_mgr->tcp_segs_freed; default: assert(0); @@ -432,10 +433,55 @@ uint64_t get_stat_value_by_idx(const struct thread_stat *thr_stat, size_t idx) } } +struct stellar_stat_config *stellar_stat_config_new(const char *toml_file) +{ + if (toml_file == NULL) + { + return NULL; + } + + struct stellar_stat_config *cfg = (struct stellar_stat_config *)calloc(1, sizeof(struct stellar_stat_config)); + if (cfg == NULL) + { + return NULL; + } + + int ret = 0; + ret += load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&cfg->nr_worker_thread, 1, MAX_THREAD_NUM); + ret += load_and_validate_toml_integer_config(toml_file, "stat.merge_interval_ms", (uint64_t *)&cfg->merge_interval_ms, 0, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "stat.output_interval_ms", (uint64_t *)&cfg->output_interval_ms, 0, 60000); + + if (ret != 0) + { + stellar_stat_config_free(cfg); + return NULL; + } + + return cfg; +} + +void stellar_stat_config_free(struct stellar_stat_config *cfg) +{ + if (cfg) + { + free(cfg); + cfg = NULL; + } +} + +void stellar_stat_config_print(const struct stellar_stat_config *cfg) +{ + if (cfg) + { + STAT_LOG_INFO("stat.merge_interval_ms : %lu", cfg->merge_interval_ms); + STAT_LOG_INFO("stat.output_interval_ms : %lu", cfg->output_interval_ms); + } +} + // python3 -m pip install prettytable // python3 -m pip install jinja2 // /opt/MESA/bin/fieldstat_exporter.py local -j log/stellar_fs4.json -e -l --clear-screen -struct stellar_stat *stellar_stat_new(uint16_t nr_thread) +struct stellar_stat *stellar_stat_new(const struct stellar_stat_config *cfg, uint64_t now_ms) { struct stellar_stat *stat = (struct stellar_stat *)calloc(1, sizeof(struct stellar_stat)); if (stat == NULL) @@ -443,7 +489,7 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread) return NULL; } - snprintf(stat->output_file, sizeof(stat->output_file), "./log/stellar_fs4.json"); + memcpy(&stat->cfg, cfg, sizeof(struct stellar_stat_config)); stat->fs = fieldstat_easy_new(1, "stellar", NULL, 0); if (stat->fs == NULL) @@ -452,7 +498,6 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread) goto error_out; } - stat->nr_thread = nr_thread; for (int i = 0; i < MAX_THREAD_NUM; i++) { stat->flag[i] = IS_FREE; @@ -463,6 +508,9 @@ struct stellar_stat *stellar_stat_new(uint16_t nr_thread) stat->stat_idx[i] = fieldstat_easy_register_counter(stat->fs, name[i]); } + stat->last_merge_stat_ts = now_ms; + stat->last_output_stat_ts = now_ms; + return stat; error_out: @@ -484,9 +532,15 @@ void stellar_stat_free(struct stellar_stat *stat) } } -void stellar_stat_output(struct stellar_stat *stat) +void stellar_stat_output(struct stellar_stat *stat, uint64_t now_ms) { - for (uint16_t i = 0; i < stat->nr_thread; i++) + if (now_ms - stat->last_output_stat_ts < stat->cfg.output_interval_ms) + { + return; + } + stat->last_output_stat_ts = now_ms; + + for (uint16_t i = 0; i < stat->cfg.nr_worker_thread; i++) { if (ATOMIC_READ(&(stat->flag[i])) == IS_BUSY) { @@ -510,10 +564,10 @@ void stellar_stat_output(struct stellar_stat *stat) fieldstat_easy_output(stat->fs, &buff, &len); if (buff) { - FILE *fp = fopen(stat->output_file, "w+"); + FILE *fp = fopen("./log/stellar_fs4.json", "w+"); if (fp == NULL) { - STAT_LOG_ERROR("failed to open file: %s, %s", stat->output_file, strerror(errno)); + STAT_LOG_ERROR("failed to open file: ./log/stellar_fs4.json, %s", strerror(errno)); } else { @@ -530,12 +584,18 @@ void stellar_stat_output(struct stellar_stat *stat) } } -void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx) +void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx, uint64_t now_ms) { + if (now_ms - stat->last_merge_stat_ts < stat->cfg.merge_interval_ms) + { + return; + } + if (ATOMIC_READ(&(stat->flag[thr_idx])) == IS_FREE) { memcpy(&stat->thr_stat[thr_idx], thr_stat, sizeof(struct thread_stat)); ATOMIC_SET(&(stat->flag[thr_idx]), IS_BUSY); + stat->last_merge_stat_ts = now_ms; } } diff --git a/infra/core/stellar_stat.h b/infra/core/stellar_stat.h index fbef78d..3560ce8 100644 --- a/infra/core/stellar_stat.h +++ b/infra/core/stellar_stat.h @@ -11,16 +11,27 @@ extern "C" struct thread_stat { - struct packet_io_stat *packet_io; - struct ip_reassembly_stat *ip_reassembly; - struct session_manager_stat *session_mgr; + struct packet_io_stat *pkt_io; + struct ip_reassembly_stat *ip_reass; + struct session_manager_stat *sess_mgr; }; +struct stellar_stat_config +{ + uint16_t nr_worker_thread; // range [1, MAX_THREAD_NUM] + uint64_t merge_interval_ms; // range: [0, 60000] (ms) + uint64_t output_interval_ms; // range: [0, 60000] (ms) +}; + +struct stellar_stat_config *stellar_stat_config_new(const char *toml_file); +void stellar_stat_config_free(struct stellar_stat_config *cfg); +void stellar_stat_config_print(const struct stellar_stat_config *cfg); + struct stellar_stat; -struct stellar_stat *stellar_stat_new(uint16_t nr_thread); +struct stellar_stat *stellar_stat_new(const struct stellar_stat_config *cfg, uint64_t now_ms); void stellar_stat_free(struct stellar_stat *stat); -void stellar_stat_output(struct stellar_stat *stat); -void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx); +void stellar_stat_output(struct stellar_stat *stat, uint64_t now_ms); +void stellar_stat_merge(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx, uint64_t now_ms); void stellar_stat_print(struct stellar_stat *stat, const struct thread_stat *thr_stat, uint16_t thr_idx); #ifdef __cplusplus diff --git a/test/decoders/http/test_based_on_stellar/env/stellar.toml b/test/decoders/http/test_based_on_stellar/env/stellar.toml index e43a206..308c884 100644 --- a/test/decoders/http/test_based_on_stellar/env/stellar.toml +++ b/test/decoders/http/test_based_on_stellar/env/stellar.toml @@ -59,6 +59,6 @@ timeout_ms = 100 # range: [1, 60000] (ms) buffered_segments_max = 256 # range: [2, 4096] per flow -[schedule] - merge_stat_interval = 50 # range: [1, 60000] (ms) - output_stat_interval = 200 # range: [1, 60000] (ms) +[stat] + merge_interval_ms = 500 # range: [0, 60000] (ms) + output_interval_ms = 1000 # range: [0, 60000] (ms) diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml index 50963b6..cb8fe4b 100644 --- a/test/packet_inject/conf/stellar.toml +++ b/test/packet_inject/conf/stellar.toml @@ -59,6 +59,6 @@ timeout_ms = 10000 # range: [1, 60000] (ms) buffered_segments_max = 256 # range: [2, 4096] per flow -[schedule] - merge_stat_interval = 500 # range: [1, 60000] (ms) - output_stat_interval = 2000 # range: [1, 60000] (ms) +[stat] + merge_interval_ms = 500 # range: [0, 60000] (ms) + output_interval_ms = 1000 # range: [0, 60000] (ms)