From 79e70f71457aebc58c157dda09cb8656f8cdde5e Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Tue, 27 Aug 2024 16:19:20 +0800 Subject: [PATCH] feature: consume all packets and free all sessions before exit --- conf/stellar.toml | 5 +- src/core/stellar_config.cpp | 13 ++ src/core/stellar_config.h | 5 +- src/core/stellar_core.cpp | 117 ++++++++---------- src/ip_reassembly/ip_reassembly.h | 2 +- src/packet_io/dumpfile_io.cpp | 35 +++--- src/session/session_manager.h | 2 +- test/decoders/http/plugin_test_main.cpp | 2 +- .../test_based_on_stellar/env/stellar.toml | 13 +- test/packet_inject/conf/stellar.toml | 5 +- 10 files changed, 103 insertions(+), 96 deletions(-) diff --git a/conf/stellar.toml b/conf/stellar.toml index 3d01ef4..b00f519 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -59,8 +59,9 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096] [schedule] # Note: free_expired_session_interval determines the precision of session_manager timeout -free_expired_session_interval = 50 # range: [1, 60000] (ms) -free_expired_session_batch = 1000 # range: [1, 60000] +free_expired_session_interval = 50 # range: [1, 60000] (ms) +free_expired_session_batch = 1000 # range: [1, 60000] +froce_session_expire_before_exit = 0 # 1: force session to expire before exit, 0: wait for session to naturally expire before exit. # Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms) diff --git a/src/core/stellar_config.cpp b/src/core/stellar_config.cpp index ddef81b..36d95e3 100644 --- a/src/core/stellar_config.cpp +++ b/src/core/stellar_config.cpp @@ -473,6 +473,19 @@ static int parse_schedule_options(toml_table_t *root, struct schedule_options *o return -1; } + ptr = toml_raw_in(table, "froce_session_expire_before_exit"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->froce_session_expire_before_exit"); + return -1; + } + opts->froce_session_expire_before_exit = atoll(ptr); + if (opts->froce_session_expire_before_exit != 0 && opts->froce_session_expire_before_exit != 1) + { + CONFIG_LOG_ERROR("config file invalid schedule->froce_session_expire_before_exit %ld, range [0, 1]", opts->froce_session_expire_before_exit); + return -1; + } + ptr = toml_raw_in(table, "free_expired_ip_frag_interval"); if (ptr == NULL) { diff --git a/src/core/stellar_config.h b/src/core/stellar_config.h index d1d5500..5c53c8b 100644 --- a/src/core/stellar_config.h +++ b/src/core/stellar_config.h @@ -12,8 +12,9 @@ extern "C" struct schedule_options { // Note: free_expired_session_interval determines the precision of session_manager timeout - uint64_t free_expired_session_interval; // range: [1, 60000] (ms) - uint64_t free_expired_session_batch; // range: [1, 60000] + uint64_t free_expired_session_interval; // range: [1, 60000] (ms) + uint64_t free_expired_session_batch; // range: [1, 60000] + uint64_t froce_session_expire_before_exit; // range: [0, 1] // Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout uint64_t free_expired_ip_frag_interval; // range: [1, 60000] (ms) diff --git a/src/core/stellar_core.cpp b/src/core/stellar_core.cpp index c3a9acb..ac18870 100644 --- a/src/core/stellar_core.cpp +++ b/src/core/stellar_core.cpp @@ -37,28 +37,15 @@ static const char logo_str[] = " \\__ \\ | |_ | __/ | | | | | (_| | | |\n" " |___/ \\__| \\___| |_| |_| \\__,_| |_|\n"; -struct schedule_data -{ - uint64_t last_free_expired_session_timestamp; - uint64_t last_free_expired_ip_frag_timestamp; - uint64_t last_merge_thread_stat_timestamp; - - uint64_t free_expired_session_interval; - uint64_t free_expired_session_batch; - - uint64_t free_expired_ip_frag_interval; - uint64_t free_expired_ip_frag_batch; - - uint64_t merge_stat_interval; - uint64_t packet_io_yield_interval; -}; - struct stellar_thread { pthread_t tid; uint16_t idx; uint64_t is_runing; - struct schedule_data sched_data; + uint64_t need_exit; + uint64_t last_free_expired_session_timestamp; + uint64_t last_free_expired_ip_frag_timestamp; + uint64_t last_merge_thread_stat_timestamp; struct snowflake *snowflake; struct ip_reassembly *ip_mgr; struct session_manager *sess_mgr; @@ -202,11 +189,19 @@ static void *work_thread(void *arg) struct stellar_thread *thread = (struct stellar_thread *)arg; struct ip_reassembly *ip_reass = thread->ip_mgr; struct session_manager *sess_mgr = thread->sess_mgr; + struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr); struct stellar *st = thread->st; + struct stellar_config *config = &st->config; struct stellar_runtime *runtime = &st->runtime; - struct schedule_data *sched_data = &thread->sched_data; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; + uint64_t free_expired_session_interval = config->sched_opts.free_expired_session_interval; + uint64_t free_expired_session_batch = config->sched_opts.free_expired_session_batch; + uint64_t froce_session_expire_before_exit = config->sched_opts.froce_session_expire_before_exit; + uint64_t free_expired_ip_frag_interval = config->sched_opts.free_expired_ip_frag_interval; + uint64_t free_expired_ip_frag_batch = config->sched_opts.free_expired_ip_frag_batch; + uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval; + uint64_t packet_io_yield_interval = config->sched_opts.packet_io_yield_interval; uint16_t thr_idx = thread->idx; __current_thread_idx = thr_idx; @@ -232,7 +227,7 @@ static void *work_thread(void *arg) ATOMIC_SET(&thread->is_runing, 1); CORE_LOG_FATAL("worker thread %d runing", thr_idx); - while (ATOMIC_READ(&runtime->need_exit) == 0) + while (ATOMIC_READ(&thread->need_exit) == 0) { /* * We use the system's real time instead of monotonic time for the following reasons: @@ -345,29 +340,43 @@ static void *work_thread(void *arg) plugin_manager_on_polling(plug_mgr); // per free_expired_session_interval MAX free_expired_session_batch sessions are released - if (now_ms - sched_data->last_free_expired_session_timestamp >= sched_data->free_expired_session_interval) + if (now_ms - thread->last_free_expired_session_timestamp >= free_expired_session_interval) { - free_expired_sessions(sess_mgr, sched_data->free_expired_session_batch, now_ms); - sched_data->last_free_expired_session_timestamp = now_ms; + free_expired_sessions(sess_mgr, free_expired_session_batch, now_ms); + thread->last_free_expired_session_timestamp = now_ms; } // per merge_stat_interval merge thread stat - if (now_ms - sched_data->last_merge_thread_stat_timestamp >= sched_data->merge_stat_interval) + if (now_ms - thread->last_merge_thread_stat_timestamp >= merge_stat_interval) { merge_thread_stat(thread); - sched_data->last_merge_thread_stat_timestamp = now_ms; + thread->last_merge_thread_stat_timestamp = now_ms; } // per free_expired_ip_frag_interval MAX free_expired_ip_frag_batch ip fragments are released - if (now_ms - sched_data->last_free_expired_ip_frag_timestamp >= sched_data->free_expired_ip_frag_interval) + if (now_ms - thread->last_free_expired_ip_frag_timestamp >= free_expired_ip_frag_interval) { - ip_reassembly_expire(ip_reass, sched_data->free_expired_ip_frag_batch, now_ms); - sched_data->last_free_expired_ip_frag_timestamp = now_ms; + ip_reassembly_expire(ip_reass, free_expired_ip_frag_batch, now_ms); + thread->last_free_expired_ip_frag_timestamp = now_ms; } if (nr_recv == 0) { - packet_io_yield(packet_io, thr_idx, sched_data->packet_io_yield_interval); + packet_io_yield(packet_io, thr_idx, packet_io_yield_interval); + } + } + + if (froce_session_expire_before_exit) + { + free_expired_sessions(sess_mgr, UINT64_MAX, UINT64_MAX); + } + else + { + while (sess_stat->tcp_sess_used > 0 || sess_stat->udp_sess_used > 0) + { + now_ms = clock_get_real_time_ms(); + free_expired_sessions(sess_mgr, free_expired_session_batch, now_ms); + usleep(1000); // 1ms } } @@ -384,27 +393,6 @@ static void *work_thread(void *arg) * Stellar Main Function ******************************************************************************/ -static int all_session_have_freed(struct stellar_runtime *runtime, struct stellar_config *config) -{ - for (int i = 0; i < config->pkt_io_opts.nr_threads; i++) - { - struct session_manager *sess_mgr = runtime->threads[i].sess_mgr; - struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr); - - if (ATOMIC_READ(&sess_stat->tcp_sess_used) != 0) - { - return 0; - } - - if (ATOMIC_READ(&sess_stat->udp_sess_used) != 0) - { - return 0; - } - } - - return 1; -} - static int stellar_thread_init(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; @@ -417,18 +405,9 @@ static int stellar_thread_init(struct stellar *st) thread->idx = i; thread->is_runing = 0; - thread->sched_data.last_free_expired_session_timestamp = now_ms; - thread->sched_data.last_free_expired_ip_frag_timestamp = now_ms; - thread->sched_data.last_merge_thread_stat_timestamp = now_ms; - - thread->sched_data.free_expired_session_interval = config->sched_opts.free_expired_session_interval; - thread->sched_data.free_expired_session_batch = config->sched_opts.free_expired_session_batch; - - thread->sched_data.free_expired_ip_frag_interval = config->sched_opts.free_expired_ip_frag_interval; - thread->sched_data.free_expired_ip_frag_batch = config->sched_opts.free_expired_ip_frag_batch; - - thread->sched_data.merge_stat_interval = config->sched_opts.merge_stat_interval; - thread->sched_data.packet_io_yield_interval = config->sched_opts.packet_io_yield_interval; + thread->last_free_expired_session_timestamp = now_ms; + thread->last_free_expired_ip_frag_timestamp = now_ms; + thread->last_merge_thread_stat_timestamp = now_ms; thread->snowflake = snowflake_new(i, config->snowflake_opts.snowflake_base, config->snowflake_opts.snowflake_offset); if (thread->snowflake == NULL) @@ -623,16 +602,21 @@ void stellar_run(struct stellar *st) } usleep(1000); // 1ms - // only available in dump file mode, automatically exits when all sessions have been released - if (packet_io_isbreak(runtime->packet_io) && all_session_have_freed(runtime, config)) + // only available in dumpfile mode + if (packet_io_isbreak(runtime->packet_io)) { - CORE_LOG_FATAL("all sessions have been released, notify threads to exit"); + for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) + { + struct stellar_thread *thread = &runtime->threads[i]; + ATOMIC_SET(&thread->need_exit, 1); + } + CORE_LOG_FATAL("notify worker thread to exit"); stellar_stat_output(runtime->stat); // flush stat - ATOMIC_SET(&runtime->need_exit, 1); + break; } } - // berfore exit, output last stat + stellar_thread_join(st); stellar_stat_output(runtime->stat); } @@ -642,7 +626,6 @@ void stellar_free(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; - stellar_thread_join(st); stellar_thread_clean(st); packet_io_free(runtime->packet_io); plugin_manager_exit(runtime->plug_mgr); diff --git a/src/ip_reassembly/ip_reassembly.h b/src/ip_reassembly/ip_reassembly.h index 5f35f6a..456e09b 100644 --- a/src/ip_reassembly/ip_reassembly.h +++ b/src/ip_reassembly/ip_reassembly.h @@ -13,7 +13,7 @@ struct ip_reassembly_options uint32_t bucket_num; // range: [1, 4294967295] }; -struct ip_reassembly_stat +struct __attribute__((aligned(64))) ip_reassembly_stat { // IPv4 frag stat uint64_t ip4_defrags_expected; diff --git a/src/packet_io/dumpfile_io.cpp b/src/packet_io/dumpfile_io.cpp index edda893..138ff3f 100644 --- a/src/packet_io/dumpfile_io.cpp +++ b/src/packet_io/dumpfile_io.cpp @@ -35,6 +35,9 @@ struct dumpfile_io uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; + + uint64_t read_pcap_files; + uint64_t read_pcap_pkts; }; struct pcap_pkt @@ -121,12 +124,6 @@ static void packet_queue_pop(struct packet_queue *queue, void **data) queue->head = (queue->head + 1) % queue->size; } -static int packet_queue_isempty(struct packet_queue *queue) -{ - uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); - return read == 0; -} - /****************************************************************************** * Private API -- utils ******************************************************************************/ @@ -146,6 +143,7 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_ pcap_pkt->len = h->caplen; pcap_pkt->ts = h->ts; memcpy((char *)pcap_pkt->data, bytes, h->caplen); + ATOMIC_INC(&handle->read_pcap_pkts); // calculate packet hash struct packet pkt; @@ -188,6 +186,7 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file) PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } + handle->read_pcap_files++; pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle); pcap_close(handle->pcap); @@ -196,16 +195,22 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file) return 0; } -static int all_packet_processed(struct dumpfile_io *handle) +static int all_packet_consumed(struct dumpfile_io *handle) { + uint64_t consumed_pkts = 0; + uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts); for (uint16_t i = 0; i < handle->nr_threads; i++) { - if (!packet_queue_isempty(handle->queue[i])) - { - return 0; - } + consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx); + } + if (consumed_pkts == read_pcap_pkts) + { + return 1; + } + else + { + return 0; } - return 1; } static void *dumpfile_thread(void *arg) @@ -264,7 +269,7 @@ static void *dumpfile_thread(void *arg) erro_out: while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) { - if (all_packet_processed(handle)) + if (all_packet_consumed(handle)) { ATOMIC_SET(&handle->io_thread_wait_exit, 1); } @@ -330,6 +335,8 @@ void dumpfile_io_free(struct dumpfile_io *handle) usleep(1000); } + PACKET_IO_LOG_FATAL("dumpfile io thread read pcap files %lu, read pcap pkts %lu", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts)); + struct pcap_pkt *pcap_pkt = NULL; for (uint16_t i = 0; i < handle->nr_threads; i++) { @@ -380,7 +387,7 @@ uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struc } else { - stat->pkts_rx++; + ATOMIC_INC(&stat->pkts_rx); stat->bytes_rx += pcap_pkt->len; stat->raw_pkts_rx++; diff --git a/src/session/session_manager.h b/src/session/session_manager.h index a7ede4b..32bc8e7 100644 --- a/src/session/session_manager.h +++ b/src/session/session_manager.h @@ -48,7 +48,7 @@ struct session_manager_options uint32_t tcp_reassembly_max_segments; // range: [2, 512] }; -struct session_manager_stat +struct __attribute__((aligned(64))) session_manager_stat { // TCP session uint64_t history_tcp_sessions; diff --git a/test/decoders/http/plugin_test_main.cpp b/test/decoders/http/plugin_test_main.cpp index 5cb0004..90b5bdf 100644 --- a/test/decoders/http/plugin_test_main.cpp +++ b/test/decoders/http/plugin_test_main.cpp @@ -140,10 +140,10 @@ int main(int argc, char *argv[]) ::testing::InitGoogleTest(&argc, argv); struct stellar *st = stellar_new("./conf/stellar.toml", "./plugin/spec.toml", "./conf/log.toml"); stellar_run(st); + stellar_free(st); if (result_json_path != NULL) { ret = RUN_ALL_TESTS(); } - stellar_free(st); return ret; } diff --git a/test/decoders/http/test_based_on_stellar/env/stellar.toml b/test/decoders/http/test_based_on_stellar/env/stellar.toml index 2b59a81..9314c67 100644 --- a/test/decoders/http/test_based_on_stellar/env/stellar.toml +++ b/test/decoders/http/test_based_on_stellar/env/stellar.toml @@ -34,10 +34,10 @@ tcp_handshake_timeout = 50 # range: [1, 60000] (ms) tcp_data_timeout = 50 # range: [1, 15999999000] (ms) tcp_half_closed_timeout = 50 # range: [1, 604800000] (ms) tcp_time_wait_timeout = 50 # range: [1, 600000] (ms) -tcp_discard_timeout = 10 # range: [1, 15999999000] (ms) +tcp_discard_timeout = 10 # range: [1, 15999999000] (ms) tcp_unverified_rst_timeout = 50 # range: [1, 600000] (ms) # UDP timeout -udp_data_timeout = 50 # range: [1, 15999999000] (ms) +udp_data_timeout = 50 # range: [1, 15999999000] (ms) udp_discard_timeout = 50 # range: [1, 15999999000] (ms) # duplicate packet filter @@ -59,14 +59,15 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096] [schedule] # Note: free_expired_session_interval determines the precision of session_manager timeout -free_expired_session_interval = 50 # range: [1, 60000] (ms) -free_expired_session_batch = 100 # range: [1, 60000] +free_expired_session_interval = 50 # range: [1, 60000] (ms) +free_expired_session_batch = 100 # range: [1, 60000] +froce_session_expire_before_exit = 0 # 1: force session to expire before exit, 0: wait for session to naturally expire before exit. # Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms) -free_expired_ip_frag_batch = 100 # range: [1, 60000] +free_expired_ip_frag_batch = 100 # range: [1, 60000] -merge_stat_interval = 50 # range: [1, 60000] (ms) +merge_stat_interval = 50 # range: [1, 60000] (ms) output_stat_interval = 10 # range: [1, 60000] (ms) packet_io_yield_interval = 90 # range: [1, 60000] (ms) diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml index 18860ef..9c91997 100644 --- a/test/packet_inject/conf/stellar.toml +++ b/test/packet_inject/conf/stellar.toml @@ -59,8 +59,9 @@ tcp_reassembly_max_segments = 128 # range: [2, 4096] [schedule] # Note: free_expired_session_interval determines the precision of session_manager timeout -free_expired_session_interval = 50 # range: [1, 60000] (ms) -free_expired_session_batch = 1000 # range: [1, 60000] +free_expired_session_interval = 50 # range: [1, 60000] (ms) +free_expired_session_batch = 1000 # range: [1, 60000] +froce_session_expire_before_exit = 0 # 1: force session to expire before exit, 0: wait for session to naturally expire before exit. # Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms)