enhance: add configuration items to adjust the scheduling parameters of the main loop

This commit is contained in:
luwenpeng
2024-08-15 19:03:48 +08:00
parent 42f44b53b1
commit b0e08133b7
8 changed files with 239 additions and 26 deletions

View File

@@ -54,3 +54,17 @@ evicted_session_filter_error_rate = 0.00001 # range: [0.0, 1.0]
tcp_reassembly_enable = 1
tcp_reassembly_max_timeout = 10000 # range: [1, 60000] (ms)
tcp_reassembly_max_segments = 256 # range: [2, 4096]
[schedule]
# Note: free_expired_session_interval determines the precision of session_manager timeout
free_expired_session_interval = 50 # range: [1, 60000] (ms)
free_expired_session_batch = 1000 # range: [1, 60000]
# Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout
free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms)
free_expired_ip_frag_batch = 1000 # range: [1, 60000]
merge_stat_interval = 500 # range: [1, 60000] (ms)
output_stat_interval = 2000 # range: [1, 60000] (ms)
packet_io_yield_interval = 900 # range: [1, 60000] (ms)

View File

@@ -817,8 +817,9 @@ void ip_reassembly_free(struct ip_reassembly *assy)
}
}
void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now)
void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t max_free, uint64_t now)
{
uint64_t count = 0;
struct ip_flow *flow = NULL;
uint64_t timeout = assy->timeout;
TAILQ_FOREACH(flow, &assy->lru, lru)
@@ -829,6 +830,12 @@ void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now)
ip_reassembly_del_flow(assy, flow);
ip_reassembly_stat_inc(assy, timeout, &flow->key);
ip_flow_free(flow);
count++;
if (count >= max_free)
{
break;
}
}
else
{

View File

@@ -44,7 +44,7 @@ struct ip_reassembly_stat
struct ip_reassembly *ip_reassembly_new(const struct ip_reassembly_options *opts);
void ip_reassembly_free(struct ip_reassembly *assy);
void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t now);
void ip_reassembly_expire(struct ip_reassembly *assy, uint64_t max_free, uint64_t now);
struct ip_reassembly_stat *ip_reassembly_stat(struct ip_reassembly *assy);
/*

View File

@@ -197,6 +197,7 @@ static void *dumpfile_thread(void *arg)
PACKET_IO_LOG_STATE("dumpfile io thread is running");
scan_directory(handle->directory, dumpfile_handler, arg);
PACKET_IO_LOG_STATE("dumpfile io thread processed all pcap files");
while (ATOMIC_READ(&handle->io_thread_need_exit) == 0)
{
@@ -205,11 +206,10 @@ static void *dumpfile_thread(void *arg)
ATOMIC_SET(&handle->io_thread_wait_exit, 1);
}
PACKET_IO_LOG_STATE("dumpfile io thread has processed all pcap files, wait for exit");
sleep(1);
usleep(1000); // 1ms
}
PACKET_IO_LOG_STATE("dumpfile io thread exit !!!");
PACKET_IO_LOG_STATE("dumpfile io thread exit");
ATOMIC_SET(&handle->io_thread_is_runing, 0);
return NULL;

View File

@@ -405,6 +405,114 @@ static int parse_session_manager_section(toml_table_t *root, struct session_mana
return 0;
}
// return 0: success
// retuun -1: failed
static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts)
{
const char *ptr;
toml_table_t *table;
table = toml_table_in(root, "schedule");
if (table == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule section");
return -1;
}
ptr = toml_raw_in(table, "free_expired_session_interval");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->free_expired_session_interval");
return -1;
}
opts->free_expired_session_interval = atoll(ptr);
if (opts->free_expired_session_interval < 1 || opts->free_expired_session_interval > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->free_expired_session_interval %ld, range [1, 60000]", opts->free_expired_session_interval);
return -1;
}
ptr = toml_raw_in(table, "free_expired_session_batch");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->free_expired_session_batch");
return -1;
}
opts->free_expired_session_batch = atoll(ptr);
if (opts->free_expired_session_batch < 1 || opts->free_expired_session_batch > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->free_expired_session_batch %ld, range [1, 60000]", opts->free_expired_session_batch);
return -1;
}
ptr = toml_raw_in(table, "free_expired_ip_frag_interval");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->free_expired_ip_frag_interval");
return -1;
}
opts->free_expired_ip_frag_interval = atoll(ptr);
if (opts->free_expired_ip_frag_interval < 1 || opts->free_expired_ip_frag_interval > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->free_expired_ip_frag_interval %ld, range [1, 60000]", opts->free_expired_ip_frag_interval);
return -1;
}
ptr = toml_raw_in(table, "free_expired_ip_frag_batch");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->free_expired_ip_frag_batch");
return -1;
}
opts->free_expired_ip_frag_batch = atoll(ptr);
if (opts->free_expired_ip_frag_batch < 1 || opts->free_expired_ip_frag_batch > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->free_expired_ip_frag_batch %ld, range [1, 60000]", opts->free_expired_ip_frag_batch);
return -1;
}
ptr = toml_raw_in(table, "merge_stat_interval");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->merge_stat_interval");
return -1;
}
opts->merge_stat_interval = atoll(ptr);
if (opts->merge_stat_interval < 1 || opts->merge_stat_interval > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->merge_stat_interval %ld, range [1, 60000]", opts->merge_stat_interval);
return -1;
}
ptr = toml_raw_in(table, "output_stat_interval");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->output_stat_interval");
return -1;
}
opts->output_stat_interval = atoll(ptr);
if (opts->output_stat_interval < 1 || opts->output_stat_interval > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->output_stat_interval %ld, range [1, 60000]", opts->output_stat_interval);
return -1;
}
ptr = toml_raw_in(table, "packet_io_yield_interval");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing schedule->packet_io_yield_interval");
return -1;
}
opts->packet_io_yield_interval = atoll(ptr);
if (opts->packet_io_yield_interval < 1 || opts->packet_io_yield_interval > 60000)
{
CONFIG_LOG_ERROR("config file invalid schedule->packet_io_yield_interval %ld, range [1, 60000]", opts->packet_io_yield_interval);
return -1;
}
return 0;
}
// return 0: success
// retuun -1: failed
int stellar_config_load(struct stellar_config *config, const char *file)
@@ -448,6 +556,11 @@ int stellar_config_load(struct stellar_config *config, const char *file)
goto error_out;
}
if (parse_schedule_options(table, &config->sched_opts) != 0)
{
goto error_out;
}
ret = 0;
error_out:
@@ -538,4 +651,13 @@ void stellar_config_print(const struct stellar_config *config)
CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_enable : %d", sess_mgr_opts->tcp_reassembly_enable);
CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_timeout : %d", sess_mgr_opts->tcp_reassembly_max_timeout);
CONFIG_LOG_DEBUG("session_manager->tcp_reassembly_max_segments : %d", sess_mgr_opts->tcp_reassembly_max_segments);
// schedule config
CONFIG_LOG_DEBUG("schedule->free_expired_session_interval : %ld", config->sched_opts.free_expired_session_interval);
CONFIG_LOG_DEBUG("schedule->free_expired_session_batch : %ld", config->sched_opts.free_expired_session_batch);
CONFIG_LOG_DEBUG("schedule->free_expired_ip_frag_interval : %ld", config->sched_opts.free_expired_ip_frag_interval);
CONFIG_LOG_DEBUG("schedule->free_expired_ip_frag_batch : %ld", config->sched_opts.free_expired_ip_frag_batch);
CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval);
CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval);
CONFIG_LOG_DEBUG("schedule->packet_io_yield_interval : %ld", config->sched_opts.packet_io_yield_interval);
}

View File

@@ -10,12 +10,29 @@ extern "C"
#include "ip_reassembly.h"
#include "session_manager.h"
struct schedule_options
{
// Note: free_expired_session_interval determines the precision of session_manager timeout
uint64_t free_expired_session_interval; // range: [1, 60000] (ms)
uint64_t free_expired_session_batch; // range: [1, 60000]
// Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout
uint64_t free_expired_ip_frag_interval; // range: [1, 60000] (ms)
uint64_t free_expired_ip_frag_batch; // range: [1, 60000]
uint64_t merge_stat_interval; // range: [1, 60000] (ms)
uint64_t output_stat_interval; // range: [1, 60000] (ms)
uint64_t packet_io_yield_interval; // range: [1, 60000] (ms)
};
struct stellar_config
{
struct packet_io_options pkt_io_opts;
struct id_generator_options id_gen_opts;
struct ip_reassembly_options ip_reass_opts;
struct session_manager_options sess_mgr_opts;
struct schedule_options sched_opts;
};
int stellar_config_load(struct stellar_config *config, const char *file);

View File

@@ -26,12 +26,28 @@
#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__)
struct schedule_data
{
uint64_t last_free_expired_session_timestamp;
uint64_t last_free_expired_ip_frag_timestamp;
uint64_t last_merge_thread_stat_timestamp;
uint64_t free_expired_session_interval;
uint64_t free_expired_session_batch;
uint64_t free_expired_ip_frag_interval;
uint64_t free_expired_ip_frag_batch;
uint64_t merge_stat_interval;
uint64_t packet_io_yield_interval;
};
struct stellar_thread
{
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
uint64_t timing_wheel_last_update_ts;
struct schedule_data sched_data;
struct ip_reassembly *ip_mgr;
struct session_manager *sess_mgr;
struct stellar_runtime *runtime;
@@ -158,9 +174,9 @@ static void *work_thread(void *arg)
struct ip_reassembly *ip_reass = thread->ip_mgr;
struct session_manager *sess_mgr = thread->sess_mgr;
struct stellar_runtime *runtime = thread->runtime;
struct schedule_data *sched_data = &thread->sched_data;
struct packet_io *packet_io = runtime->packet_io;
struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
uint16_t thr_idx = thread->idx;
__thread_id = thr_idx;
@@ -293,26 +309,37 @@ static void *work_thread(void *arg)
idle_tasks:
// nr_recv packet atmost trigger nr_recv session evicted
free_evicted_sessions(sess_mgr, nr_recv);
plugin_manager_on_polling(runtime->plug_mgr);
// per 5 ms, atmost free 8 expired session
if (now_ms - thread->timing_wheel_last_update_ts > 5)
// per free_expired_session_interval MAX free_expired_session_batch sessions are released
if (now_ms - sched_data->last_free_expired_session_timestamp > sched_data->free_expired_session_interval)
{
free_expired_sessions(sess_mgr, 8, now_ms);
thread->timing_wheel_last_update_ts = now_ms;
free_expired_sessions(sess_mgr, sched_data->free_expired_session_batch, now_ms);
sched_data->last_free_expired_session_timestamp = now_ms;
}
merge_thread_stat(thread);
ip_reassembly_expire(ip_reass, now_ms);
plugin_manager_on_polling(runtime->plug_mgr);
// per merge_stat_interval merge thread stat
if (now_ms - sched_data->last_merge_thread_stat_timestamp > sched_data->merge_stat_interval)
{
merge_thread_stat(thread);
sched_data->last_merge_thread_stat_timestamp = now_ms;
}
// per free_expired_ip_frag_interval MAX free_expired_ip_frag_batch ip fragments are released
if (now_ms - sched_data->last_free_expired_ip_frag_timestamp > sched_data->free_expired_ip_frag_interval)
{
ip_reassembly_expire(ip_reass, sched_data->free_expired_ip_frag_batch, now_ms);
sched_data->last_free_expired_ip_frag_timestamp = now_ms;
}
if (nr_recv == 0)
{
packet_io_yield(packet_io, thr_idx, 900);
packet_io_yield(packet_io, thr_idx, sched_data->packet_io_yield_interval);
}
}
ATOMIC_SET(&thread->is_runing, 0);
STELLAR_LOG_STATE("worker thread %d exit !!!", thr_idx);
STELLAR_LOG_STATE("worker thread %d exit", thr_idx);
return NULL;
}
@@ -325,19 +352,19 @@ static void signal_handler(int signo)
{
if (signo == SIGINT)
{
STELLAR_LOG_STATE("SIGINT received, notify threads to exit !!!");
STELLAR_LOG_STATE("SIGINT received, notify threads to exit");
ATOMIC_SET(&need_exit, 1);
}
if (signo == SIGQUIT)
{
STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit !!!");
STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit");
ATOMIC_SET(&need_exit, 1);
}
if (signo == SIGTERM)
{
STELLAR_LOG_STATE("SIGTERM received, notify threads to exit !!!");
STELLAR_LOG_STATE("SIGTERM received, notify threads to exit");
ATOMIC_SET(&need_exit, 1);
}
@@ -377,7 +404,20 @@ static int stellar_thread_init(struct stellar_runtime *runtime, struct stellar_c
struct stellar_thread *thread = &runtime->threads[i];
thread->idx = i;
thread->is_runing = 0;
thread->timing_wheel_last_update_ts = now_ms;
thread->sched_data.last_free_expired_session_timestamp = now_ms;
thread->sched_data.last_free_expired_ip_frag_timestamp = now_ms;
thread->sched_data.last_merge_thread_stat_timestamp = now_ms;
thread->sched_data.free_expired_session_interval = config->sched_opts.free_expired_session_interval;
thread->sched_data.free_expired_session_batch = config->sched_opts.free_expired_session_batch;
thread->sched_data.free_expired_ip_frag_interval = config->sched_opts.free_expired_ip_frag_interval;
thread->sched_data.free_expired_ip_frag_batch = config->sched_opts.free_expired_ip_frag_batch;
thread->sched_data.merge_stat_interval = config->sched_opts.merge_stat_interval;
thread->sched_data.packet_io_yield_interval = config->sched_opts.packet_io_yield_interval;
thread->sess_mgr = session_manager_new(&config->sess_mgr_opts, now_ms);
if (thread->sess_mgr == NULL)
{
@@ -403,7 +443,6 @@ static void stellar_thread_clean(struct stellar_runtime *runtime, struct stellar
struct stellar_thread *thread = &runtime->threads[i];
if (ATOMIC_READ(&thread->is_runing) == 0)
{
STELLAR_LOG_STATE("clean worker thread %d context", i);
session_manager_free(thread->sess_mgr);
ip_reassembly_free(thread->ip_mgr);
}
@@ -427,13 +466,13 @@ static int stellar_thread_run(struct stellar_runtime *runtime, struct stellar_co
static void stellar_thread_join(struct stellar_runtime *runtime, struct stellar_config *config)
{
STELLAR_LOG_STATE("wait worker thread exit ...");
for (uint16_t i = 0; i < config->pkt_io_opts.nr_threads; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
while (ATOMIC_READ(&thread->is_runing) == 1)
{
STELLAR_LOG_STATE("wait worker thread %d stop", i);
sleep(1);
usleep(1000); // 1ms
}
}
}
@@ -507,7 +546,7 @@ int stellar_run(int argc __attribute__((unused)), char **argv __attribute__((unu
runtime->stat_last_output_ts = stellar_get_real_time_msec();
while (!ATOMIC_READ(&need_exit))
{
if (stellar_get_real_time_msec() - runtime->stat_last_output_ts > 2000)
if (stellar_get_real_time_msec() - runtime->stat_last_output_ts > config->sched_opts.output_stat_interval)
{
runtime->stat_last_output_ts = stellar_get_real_time_msec();
stellar_stat_output(runtime->stat);
@@ -518,7 +557,7 @@ int stellar_run(int argc __attribute__((unused)), char **argv __attribute__((unu
if (packet_io_wait_exit(runtime->packet_io) && all_session_have_freed(runtime, config))
{
stellar_stat_output(runtime->stat); // flush stat
STELLAR_LOG_STATE("all sessions have been released, notify threads to exit !!!");
STELLAR_LOG_STATE("all sessions have been released, notify threads to exit");
ATOMIC_SET(&need_exit, 1);
}
}
@@ -529,7 +568,7 @@ error_out:
packet_io_free(runtime->packet_io);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
STELLAR_LOG_STATE("stellar exit\n");
log_free();
return 0;

View File

@@ -54,3 +54,17 @@ evicted_session_filter_error_rate = 0.00001 # range: [0.0, 1.0]
tcp_reassembly_enable = 1
tcp_reassembly_max_timeout = 10000 # range: [1, 60000] (ms)
tcp_reassembly_max_segments = 128 # range: [2, 4096]
[schedule]
# Note: free_expired_session_interval determines the precision of session_manager timeout
free_expired_session_interval = 50 # range: [1, 60000] (ms)
free_expired_session_batch = 1000 # range: [1, 60000]
# Note: free_expired_ip_frag_interval determines the precision of ip_reassembly timeout
free_expired_ip_frag_interval = 50 # range: [1, 60000] (ms)
free_expired_ip_frag_batch = 1000 # range: [1, 60000]
merge_stat_interval = 50 # range: [1, 60000] (ms)
output_stat_interval = 2000 # range: [1, 60000] (ms)
packet_io_yield_interval = 900 # range: [1, 60000] (ms)