diff --git a/conf/stellar.toml b/conf/stellar.toml index bbefa23..0ba7fba 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -54,3 +54,17 @@ evicted_session_filter_error_rate = 0.00001 # range: [0.0, 1.0] tcp_reassembly_enable = 1 tcp_reassembly_max_timeout = 10000 # range: [1, 60000] (ms) tcp_reassembly_max_segments = 256 # range: [2, 4096] + +[schedule] +# Note: free_expired_session_interval determines the precision of session_manager timeout +free_expired_session_interval = 50 # range: [1, 60000] (ms) +free_expired_session_batch = 1000 # range: [1, 60000] + +# Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout +free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms) +free_expired_ip_frag_batch = 1000 # range: [1, 60000] + +merge_stat_interval = 500 # range: [1, 60000] (ms) +output_stat_interval = 2000 # range: [1, 60000] (ms) + +packet_io_yield_interval = 900 # range: [1, 60000] (ms) diff --git a/src/ip_reassembly/ip_reassembly.cpp b/src/ip_reassembly/ip_reassembly.cpp index 17c3f8f..b325f97 100644 --- a/src/ip_reassembly/ip_reassembly.cpp +++ b/src/ip_reassembly/ip_reassembly.cpp @@ -817,8 +817,9 @@ void ip_reassembly_free(struct ip_reassembly *assy) } } -void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now) +void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t max_free, uint64_t now) { + uint64_t count = 0; struct ip_flow *flow = NULL; uint64_t timeout = assy->timeout; TAILQ_FOREACH(flow, &assy->lru, lru) @@ -829,6 +830,12 @@ void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now) ip_reassembly_del_flow(assy, flow); ip_reassembly_stat_inc(assy, timeout, &flow->key); ip_flow_free(flow); + count++; + + if (count >= max_free) + { + break; + } } else { diff --git a/src/ip_reassembly/ip_reassembly.h b/src/ip_reassembly/ip_reassembly.h index de81280..57ede53 100644 --- a/src/ip_reassembly/ip_reassembly.h +++ b/src/ip_reassembly/ip_reassembly.h @@ -44,7 +44,7 @@ struct ip_reassembly_stat struct ip_reassembly *ip_reassembly_new(const struct ip_reassembly_options *opts); void ip_reassembly_free(struct ip_reassembly *assy); -void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now); +void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t max_free, uint64_t now); struct ip_reassembly_stat *ip_reassembly_stat(struct ip_reassembly *assy); /* diff --git a/src/packet_io/dumpfile_io.cpp b/src/packet_io/dumpfile_io.cpp index 9e126a6..c6ede96 100644 --- a/src/packet_io/dumpfile_io.cpp +++ b/src/packet_io/dumpfile_io.cpp @@ -197,6 +197,7 @@ static void *dumpfile_thread(void *arg) PACKET_IO_LOG_STATE("dumpfile io thread is running"); scan_directory(handle->directory, dumpfile_handler, arg); + PACKET_IO_LOG_STATE("dumpfile io thread processed all pcap files"); while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) { @@ -205,11 +206,10 @@ static void *dumpfile_thread(void *arg) ATOMIC_SET(&handle->io_thread_wait_exit, 1); } - PACKET_IO_LOG_STATE("dumpfile io thread has processed all pcap files, wait for exit"); - sleep(1); + usleep(1000); // 1ms } - PACKET_IO_LOG_STATE("dumpfile io thread exit !!!"); + PACKET_IO_LOG_STATE("dumpfile io thread exit"); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; diff --git a/src/stellar/stellar_config.cpp b/src/stellar/stellar_config.cpp index f42b451..3126424 100644 --- a/src/stellar/stellar_config.cpp +++ b/src/stellar/stellar_config.cpp @@ -405,6 +405,114 @@ static int parse_session_manager_section(toml_table_t *root, struct session_mana return 0; } +// return 0: success +// retuun -1: failed +static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts) +{ + const char *ptr; + toml_table_t *table; + + table = toml_table_in(root, "schedule"); + if (table == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule section"); + return -1; + } + + ptr = toml_raw_in(table, "free_expired_session_interval"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->free_expired_session_interval"); + return -1; + } + opts->free_expired_session_interval = atoll(ptr); + if (opts->free_expired_session_interval < 1 || opts->free_expired_session_interval > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->free_expired_session_interval %ld, range [1, 60000]", opts->free_expired_session_interval); + return -1; + } + + ptr = toml_raw_in(table, "free_expired_session_batch"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->free_expired_session_batch"); + return -1; + } + opts->free_expired_session_batch = atoll(ptr); + if (opts->free_expired_session_batch < 1 || opts->free_expired_session_batch > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->free_expired_session_batch %ld, range [1, 60000]", opts->free_expired_session_batch); + return -1; + } + + ptr = toml_raw_in(table, "free_expired_ip_frag_interval"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->free_expired_ip_frag_interval"); + return -1; + } + opts->free_expired_ip_frag_interval = atoll(ptr); + if (opts->free_expired_ip_frag_interval < 1 || opts->free_expired_ip_frag_interval > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->free_expired_ip_frag_interval %ld, range [1, 60000]", opts->free_expired_ip_frag_interval); + return -1; + } + + ptr = toml_raw_in(table, "free_expired_ip_frag_batch"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->free_expired_ip_frag_batch"); + return -1; + } + opts->free_expired_ip_frag_batch = atoll(ptr); + if (opts->free_expired_ip_frag_batch < 1 || opts->free_expired_ip_frag_batch > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->free_expired_ip_frag_batch %ld, range [1, 60000]", opts->free_expired_ip_frag_batch); + return -1; + } + + ptr = toml_raw_in(table, "merge_stat_interval"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->merge_stat_interval"); + return -1; + } + opts->merge_stat_interval = atoll(ptr); + if (opts->merge_stat_interval < 1 || opts->merge_stat_interval > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->merge_stat_interval %ld, range [1, 60000]", opts->merge_stat_interval); + return -1; + } + + ptr = toml_raw_in(table, "output_stat_interval"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->output_stat_interval"); + return -1; + } + opts->output_stat_interval = atoll(ptr); + if (opts->output_stat_interval < 1 || opts->output_stat_interval > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->output_stat_interval %ld, range [1, 60000]", opts->output_stat_interval); + return -1; + } + + ptr = toml_raw_in(table, "packet_io_yield_interval"); + if (ptr == NULL) + { + CONFIG_LOG_ERROR("config file missing schedule->packet_io_yield_interval"); + return -1; + } + opts->packet_io_yield_interval = atoll(ptr); + if (opts->packet_io_yield_interval < 1 || opts->packet_io_yield_interval > 60000) + { + CONFIG_LOG_ERROR("config file invalid schedule->packet_io_yield_interval %ld, range [1, 60000]", opts->packet_io_yield_interval); + return -1; + } + + return 0; +} + // return 0: success // retuun -1: failed int stellar_config_load(struct stellar_config *config, const char *file) @@ -448,6 +556,11 @@ int stellar_config_load(struct stellar_config *config, const char *file) goto error_out; } + if (parse_schedule_options(table, &config->sched_opts) != 0) + { + goto error_out; + } + ret = 0; error_out: @@ -538,4 +651,13 @@ void stellar_config_print(const struct stellar_config *config) CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_enable : %d", sess_mgr_opts->tcp_reassembly_enable); CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_timeout : %d", sess_mgr_opts->tcp_reassembly_max_timeout); CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_segments : %d", sess_mgr_opts->tcp_reassembly_max_segments); + + // schedule config + CONFIG_LOG_DEBUG("schedule->free_expired_session_interval : %ld", config->sched_opts.free_expired_session_interval); + CONFIG_LOG_DEBUG("schedule->free_expired_session_batch : %ld", config->sched_opts.free_expired_session_batch); + CONFIG_LOG_DEBUG("schedule->free_expired_ip_frag_interval : %ld", config->sched_opts.free_expired_ip_frag_interval); + CONFIG_LOG_DEBUG("schedule->free_expired_ip_frag_batch : %ld", config->sched_opts.free_expired_ip_frag_batch); + CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval); + CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval); + CONFIG_LOG_DEBUG("schedule->packet_io_yield_interval : %ld", config->sched_opts.packet_io_yield_interval); } diff --git a/src/stellar/stellar_config.h b/src/stellar/stellar_config.h index d0724d1..3fde27f 100644 --- a/src/stellar/stellar_config.h +++ b/src/stellar/stellar_config.h @@ -10,12 +10,29 @@ extern "C" #include "ip_reassembly.h" #include "session_manager.h" +struct schedule_options +{ + // Note: free_expired_session_interval determines the precision of session_manager timeout + uint64_t free_expired_session_interval; // range: [1, 60000] (ms) + uint64_t free_expired_session_batch; // range: [1, 60000] + + // Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout + uint64_t free_expired_ip_frag_interval; // range: [1, 60000] (ms) + uint64_t free_expired_ip_frag_batch; // range: [1, 60000] + + uint64_t merge_stat_interval; // range: [1, 60000] (ms) + uint64_t output_stat_interval; // range: [1, 60000] (ms) + + uint64_t packet_io_yield_interval; // range: [1, 60000] (ms) +}; + struct stellar_config { struct packet_io_options pkt_io_opts; struct id_generator_options id_gen_opts; struct ip_reassembly_options ip_reass_opts; struct session_manager_options sess_mgr_opts; + struct schedule_options sched_opts; }; int stellar_config_load(struct stellar_config *config, const char *file); diff --git a/src/stellar/stellar_core.cpp b/src/stellar/stellar_core.cpp index 80f437c..898a794 100644 --- a/src/stellar/stellar_core.cpp +++ b/src/stellar/stellar_core.cpp @@ -26,12 +26,28 @@ #define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) #define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__) +struct schedule_data +{ + uint64_t last_free_expired_session_timestamp; + uint64_t last_free_expired_ip_frag_timestamp; + uint64_t last_merge_thread_stat_timestamp; + + uint64_t free_expired_session_interval; + uint64_t free_expired_session_batch; + + uint64_t free_expired_ip_frag_interval; + uint64_t free_expired_ip_frag_batch; + + uint64_t merge_stat_interval; + uint64_t packet_io_yield_interval; +}; + struct stellar_thread { pthread_t tid; uint16_t idx; uint64_t is_runing; - uint64_t timing_wheel_last_update_ts; + struct schedule_data sched_data; struct ip_reassembly *ip_mgr; struct session_manager *sess_mgr; struct stellar_runtime *runtime; @@ -158,9 +174,9 @@ static void *work_thread(void *arg) struct ip_reassembly *ip_reass = thread->ip_mgr; struct session_manager *sess_mgr = thread->sess_mgr; struct stellar_runtime *runtime = thread->runtime; + struct schedule_data *sched_data = &thread->sched_data; struct packet_io *packet_io = runtime->packet_io; struct plugin_manager_schema *plug_mgr = runtime->plug_mgr; - uint16_t thr_idx = thread->idx; __thread_id = thr_idx; @@ -293,26 +309,37 @@ static void *work_thread(void *arg) idle_tasks: // nr_recv packet atmost trigger nr_recv session evicted free_evicted_sessions(sess_mgr, nr_recv); + plugin_manager_on_polling(runtime->plug_mgr); - // per 5 ms, atmost free 8 expired session - if (now_ms - thread->timing_wheel_last_update_ts > 5) + // per free_expired_session_interval MAX free_expired_session_batch sessions are released + if (now_ms - sched_data->last_free_expired_session_timestamp > sched_data->free_expired_session_interval) { - free_expired_sessions(sess_mgr, 8, now_ms); - thread->timing_wheel_last_update_ts = now_ms; + free_expired_sessions(sess_mgr, sched_data->free_expired_session_batch, now_ms); + sched_data->last_free_expired_session_timestamp = now_ms; } - merge_thread_stat(thread); - ip_reassembly_expire(ip_reass, now_ms); - plugin_manager_on_polling(runtime->plug_mgr); + // per merge_stat_interval merge thread stat + if (now_ms - sched_data->last_merge_thread_stat_timestamp > sched_data->merge_stat_interval) + { + merge_thread_stat(thread); + sched_data->last_merge_thread_stat_timestamp = now_ms; + } + + // per free_expired_ip_frag_interval MAX free_expired_ip_frag_batch ip fragments are released + if (now_ms - sched_data->last_free_expired_ip_frag_timestamp > sched_data->free_expired_ip_frag_interval) + { + ip_reassembly_expire(ip_reass, sched_data->free_expired_ip_frag_batch, now_ms); + sched_data->last_free_expired_ip_frag_timestamp = now_ms; + } if (nr_recv == 0) { - packet_io_yield(packet_io, thr_idx, 900); + packet_io_yield(packet_io, thr_idx, sched_data->packet_io_yield_interval); } } ATOMIC_SET(&thread->is_runing, 0); - STELLAR_LOG_STATE("worker thread %d exit !!!", thr_idx); + STELLAR_LOG_STATE("worker thread %d exit", thr_idx); return NULL; } @@ -325,19 +352,19 @@ static void signal_handler(int signo) { if (signo == SIGINT) { - STELLAR_LOG_STATE("SIGINT received, notify threads to exit !!!"); + STELLAR_LOG_STATE("SIGINT received, notify threads to exit"); ATOMIC_SET(&need_exit, 1); } if (signo == SIGQUIT) { - STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit !!!"); + STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit"); ATOMIC_SET(&need_exit, 1); } if (signo == SIGTERM) { - STELLAR_LOG_STATE("SIGTERM received, notify threads to exit !!!"); + STELLAR_LOG_STATE("SIGTERM received, notify threads to exit"); ATOMIC_SET(&need_exit, 1); } @@ -377,7 +404,20 @@ static int stellar_thread_init(struct stellar_runtime *runtime, struct stellar_c struct stellar_thread *thread = &runtime->threads[i]; thread->idx = i; thread->is_runing = 0; - thread->timing_wheel_last_update_ts = now_ms; + + thread->sched_data.last_free_expired_session_timestamp = now_ms; + thread->sched_data.last_free_expired_ip_frag_timestamp = now_ms; + thread->sched_data.last_merge_thread_stat_timestamp = now_ms; + + thread->sched_data.free_expired_session_interval = config->sched_opts.free_expired_session_interval; + thread->sched_data.free_expired_session_batch = config->sched_opts.free_expired_session_batch; + + thread->sched_data.free_expired_ip_frag_interval = config->sched_opts.free_expired_ip_frag_interval; + thread->sched_data.free_expired_ip_frag_batch = config->sched_opts.free_expired_ip_frag_batch; + + thread->sched_data.merge_stat_interval = config->sched_opts.merge_stat_interval; + thread->sched_data.packet_io_yield_interval = config->sched_opts.packet_io_yield_interval; + thread->sess_mgr = session_manager_new(&config->sess_mgr_opts, now_ms); if (thread->sess_mgr == NULL) { @@ -403,7 +443,6 @@ static void stellar_thread_clean(struct stellar_runtime *runtime, struct stellar struct stellar_thread *thread = &runtime->threads[i]; if (ATOMIC_READ(&thread->is_runing) == 0) { - STELLAR_LOG_STATE("clean worker thread %d context", i); session_manager_free(thread->sess_mgr); ip_reassembly_free(thread->ip_mgr); } @@ -427,13 +466,13 @@ static int stellar_thread_run(struct stellar_runtime *runtime, struct stellar_co static void stellar_thread_join(struct stellar_runtime *runtime, struct stellar_config *config) { + STELLAR_LOG_STATE("wait worker thread exit ..."); for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++) { struct stellar_thread *thread = &runtime->threads[i]; while (ATOMIC_READ(&thread->is_runing) == 1) { - STELLAR_LOG_STATE("wait worker thread %d stop", i); - sleep(1); + usleep(1000); // 1ms } } } @@ -507,7 +546,7 @@ int stellar_run(int argc __attribute__((unused)), char **argv __attribute__((unu runtime->stat_last_output_ts = stellar_get_real_time_msec(); while (!ATOMIC_READ(&need_exit)) { - if (stellar_get_real_time_msec() - runtime->stat_last_output_ts > 2000) + if (stellar_get_real_time_msec() - runtime->stat_last_output_ts > config->sched_opts.output_stat_interval) { runtime->stat_last_output_ts = stellar_get_real_time_msec(); stellar_stat_output(runtime->stat); @@ -518,7 +557,7 @@ int stellar_run(int argc __attribute__((unused)), char **argv __attribute__((unu if (packet_io_wait_exit(runtime->packet_io) && all_session_have_freed(runtime, config)) { stellar_stat_output(runtime->stat); // flush stat - STELLAR_LOG_STATE("all sessions have been released, notify threads to exit !!!"); + STELLAR_LOG_STATE("all sessions have been released, notify threads to exit"); ATOMIC_SET(&need_exit, 1); } } @@ -529,7 +568,7 @@ error_out: packet_io_free(runtime->packet_io); plugin_manager_exit(runtime->plug_mgr); stellar_stat_free(runtime->stat); - STELLAR_LOG_STATE("stellar exit !!!\n"); + STELLAR_LOG_STATE("stellar exit\n"); log_free(); return 0; diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml index 5aa7c7e..2fa1f9d 100644 --- a/test/packet_inject/conf/stellar.toml +++ b/test/packet_inject/conf/stellar.toml @@ -54,3 +54,17 @@ evicted_session_filter_error_rate = 0.00001 # range: [0.0, 1.0] tcp_reassembly_enable = 1 tcp_reassembly_max_timeout = 10000 # range: [1, 60000] (ms) tcp_reassembly_max_segments = 128 # range: [2, 4096] + +[schedule] +# Note: free_expired_session_interval determines the precision of session_manager timeout +free_expired_session_interval = 50 # range: [1, 60000] (ms) +free_expired_session_batch = 1000 # range: [1, 60000] + +# Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout +free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms) +free_expired_ip_frag_batch = 1000 # range: [1, 60000] + +merge_stat_interval = 50 # range: [1, 60000] (ms) +output_stat_interval = 2000 # range: [1, 60000] (ms) + +packet_io_yield_interval = 900 # range: [1, 60000] (ms)