stellar stat parses output related configuration items

This commit is contained in:
luwenpeng
2024-09-02 16:03:08 +08:00
parent 9069dceae7
commit beb7d2f0ca
9 changed files with 228 additions and 294 deletions

View File

@@ -13,7 +13,6 @@
#include "log_private.h"
#include "stellar_stat.h"
#include "stellar_core.h"
#include "stellar_config.h"
#include "packet_private.h"
#include "plugin_manager.h"
#include "session_private.h"
@@ -41,7 +40,6 @@ struct stellar_thread
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
uint64_t last_merge_thread_stat_timestamp;
struct ip_reassembly *ip_reass;
struct session_manager *sess_mgr;
struct stellar *st;
@@ -50,24 +48,27 @@ struct stellar_thread
struct stellar_runtime
{
uint64_t need_exit;
uint64_t stat_last_output_ts;
struct logger *logger;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager_schema *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
// config
struct stellar_config
{
uint64_t instance_id;
struct session_manager_config *sess_mgr_cfg;
struct ip_reassembly_config *ip_reass_cfg;
struct packet_io_config *pkt_io_cfg;
struct stellar_stat_config *stat_cfg;
};
struct stellar
{
struct stellar_runtime runtime;
char magic[2]; // for check memory corruption
struct stellar_config config;
struct stellar_runtime runtime;
};
static __thread uint16_t __current_thread_idx = UINT16_MAX;
@@ -131,17 +132,14 @@ static void *worker_thread(void *arg)
struct session_manager *sess_mgr = thread->sess_mgr;
struct session_manager_stat *sess_stat = session_manager_stat(sess_mgr);
struct stellar *st = thread->st;
struct stellar_config *config = &st->config;
struct stellar_runtime *runtime = &st->runtime;
struct packet_io *packet_io = runtime->packet_io;
struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
struct thread_stat thr_stat = {
.packet_io = packet_io_stat(packet_io, thread->idx),
.ip_reassembly = ip_reassembly_stat(ip_reass),
.session_mgr = session_manager_stat(sess_mgr),
.pkt_io = packet_io_stat(packet_io, thread->idx),
.ip_reass = ip_reassembly_stat(ip_reass),
.sess_mgr = session_manager_stat(sess_mgr),
};
uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval;
uint16_t thr_idx = thread->idx;
__current_thread_idx = thr_idx;
@@ -222,10 +220,6 @@ static void *worker_thread(void *arg)
goto fast_path;
}
}
if (packet_get_session_id(pkt) == 0)
{
packet_set_session_id(pkt, session_get_id(sess));
}
plugin_manager_on_session_input(sess, pkt);
fast_path:
@@ -285,13 +279,7 @@ static void *worker_thread(void *arg)
clean_session(sess_mgr, now_ms);
ip_reassembly_expire(ip_reass, now_ms);
plugin_manager_on_polling(plug_mgr);
// per merge_stat_interval merge thread stat
if (now_ms - thread->last_merge_thread_stat_timestamp >= merge_stat_interval)
{
stellar_stat_merge(runtime->stat, &thr_stat, thread->idx);
thread->last_merge_thread_stat_timestamp = now_ms;
}
stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms);
if (nr_pkt_received == 0)
{
@@ -307,7 +295,7 @@ static void *worker_thread(void *arg)
usleep(1000); // 1ms
}
stellar_stat_merge(runtime->stat, &thr_stat, thread->idx);
stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX);
stellar_stat_print(runtime->stat, &thr_stat, thread->idx);
ATOMIC_SET(&thread->is_runing, 0);
@@ -323,25 +311,24 @@ static void *worker_thread(void *arg)
static int stellar_thread_init(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
uint64_t now_ms = clock_get_real_time_ms();
for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
thread->idx = i;
thread->is_runing = 0;
thread->last_merge_thread_stat_timestamp = now_ms;
runtime->sess_mgr_cfg->session_id_seed = runtime->instance_id << 8 | i;
thread->sess_mgr = session_manager_new(runtime->sess_mgr_cfg, now_ms);
config->sess_mgr_cfg->session_id_seed = config->instance_id << 8 | i;
thread->sess_mgr = session_manager_new(config->sess_mgr_cfg, now_ms);
if (thread->sess_mgr == NULL)
{
CORE_LOG_ERROR("unable to create session manager");
return -1;
}
thread->ip_reass = ip_reassembly_new(runtime->ip_reass_cfg, now_ms);
thread->ip_reass = ip_reassembly_new(config->ip_reass_cfg, now_ms);
if (thread->ip_reass == NULL)
{
CORE_LOG_ERROR("unable to create ip reassemble manager");
@@ -356,9 +343,10 @@ static int stellar_thread_init(struct stellar *st)
static void stellar_thread_clean(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
CORE_LOG_FATAL("cleaning worker thread context ...");
for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (ATOMIC_READ(&thread->is_runing) == 0)
@@ -373,8 +361,9 @@ static void stellar_thread_clean(struct stellar *st)
static int stellar_thread_run(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0)
@@ -390,9 +379,10 @@ static int stellar_thread_run(struct stellar *st)
static void stellar_thread_join(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
CORE_LOG_FATAL("waiting worker thread stop ...");
for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
for (uint16_t i = 0; i < config->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
while (ATOMIC_READ(&thread->is_runing) == 1)
@@ -426,7 +416,8 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
{
return NULL;
}
st->magic[0] = 0x4d;
st->magic[1] = 0x5a;
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
@@ -442,42 +433,43 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_FATAL("plugin config file : %s", plugin_cfg_file);
CORE_LOG_FATAL("log config file : %s", log_cfg_file);
if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&runtime->instance_id, 0, 4095) != 0)
if (load_and_validate_toml_integer_config(stellar_cfg_file, "instance.id", (uint64_t *)&config->instance_id, 0, 4095) != 0)
{
CORE_LOG_ERROR("unable to load instance id");
goto error_out;
}
runtime->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file);
if (runtime->sess_mgr_cfg == NULL)
config->sess_mgr_cfg = session_manager_config_new(stellar_cfg_file);
if (config->sess_mgr_cfg == NULL)
{
CORE_LOG_ERROR("unable to create session manager config");
goto error_out;
}
runtime->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file);
if (runtime->ip_reass_cfg == NULL)
config->ip_reass_cfg = ip_reassembly_config_new(stellar_cfg_file);
if (config->ip_reass_cfg == NULL)
{
CORE_LOG_ERROR("unable to create ip reassembly config");
goto error_out;
}
runtime->pkt_io_cfg = packet_io_config_new(stellar_cfg_file);
if (runtime->pkt_io_cfg == NULL)
config->pkt_io_cfg = packet_io_config_new(stellar_cfg_file);
if (config->pkt_io_cfg == NULL)
{
CORE_LOG_ERROR("unable to create packet io config");
goto error_out;
}
packet_io_config_print(runtime->pkt_io_cfg);
session_manager_config_print(runtime->sess_mgr_cfg);
ip_reassembly_config_print(runtime->ip_reass_cfg);
if (stellar_config_load(config, stellar_cfg_file) != 0)
config->stat_cfg = stellar_stat_config_new(stellar_cfg_file);
if (config->stat_cfg == NULL)
{
CORE_LOG_ERROR("unable to load config file");
CORE_LOG_ERROR("unable to create stellar stat config");
goto error_out;
}
stellar_config_print(config);
runtime->stat = stellar_stat_new(runtime->pkt_io_cfg->nr_worker_thread);
session_manager_config_print(config->sess_mgr_cfg);
ip_reassembly_config_print(config->ip_reass_cfg);
packet_io_config_print(config->pkt_io_cfg);
stellar_stat_config_print(config->stat_cfg);
runtime->stat = stellar_stat_new(config->stat_cfg, clock_get_real_time_ms());
if (runtime->stat == NULL)
{
CORE_LOG_ERROR("unable to create stellar stat");
@@ -490,7 +482,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
goto error_out;
}
runtime->packet_io = packet_io_new(runtime->pkt_io_cfg);
runtime->packet_io = packet_io_new(config->pkt_io_cfg);
if (runtime->packet_io == NULL)
{
CORE_LOG_ERROR("unable to create packet io");
@@ -523,7 +515,6 @@ void stellar_run(struct stellar *st)
}
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
if (stellar_thread_run(st) != 0)
{
@@ -531,14 +522,16 @@ void stellar_run(struct stellar *st)
return;
}
runtime->stat_last_output_ts = clock_get_real_time_ms();
while (!ATOMIC_READ(&runtime->need_exit))
{
if (clock_get_real_time_ms() - runtime->stat_last_output_ts >= config->sched_opts.output_stat_interval)
if (st->magic[0] != 0x4d || st->magic[1] != 0x5a)
{
runtime->stat_last_output_ts = clock_get_real_time_ms();
stellar_stat_output(runtime->stat);
CORE_LOG_FATAL("memory corruption detected");
ATOMIC_SET(&runtime->need_exit, 1);
break;
}
stellar_stat_output(runtime->stat, clock_get_real_time_ms());
usleep(1000); // 1ms
// only available in pcap mode
@@ -546,13 +539,12 @@ void stellar_run(struct stellar *st)
{
ATOMIC_SET(&runtime->need_exit, 1);
CORE_LOG_FATAL("notify worker thread to exit");
stellar_stat_output(runtime->stat); // flush stat
break;
}
}
stellar_thread_join(st);
stellar_stat_output(runtime->stat);
stellar_stat_output(runtime->stat, UINT64_MAX);
}
void stellar_free(struct stellar *st)
@@ -560,14 +552,18 @@ void stellar_free(struct stellar *st)
if (st)
{
struct stellar_runtime *runtime = &st->runtime;
struct stellar_config *config = &st->config;
stellar_thread_clean(st);
packet_io_free(runtime->packet_io);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
packet_io_config_free(runtime->pkt_io_cfg);
ip_reassembly_config_free(runtime->ip_reass_cfg);
session_manager_config_free(runtime->sess_mgr_cfg);
session_manager_config_free(config->sess_mgr_cfg);
ip_reassembly_config_free(config->ip_reass_cfg);
packet_io_config_free(config->pkt_io_cfg);
stellar_stat_config_free(config->stat_cfg);
CORE_LOG_FATAL("stellar exit\n");
log_free(runtime->logger);
@@ -654,7 +650,7 @@ void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
int stellar_get_worker_thread_num(struct stellar *st)
{
return st->runtime.pkt_io_cfg->nr_worker_thread;
return st->config.pkt_io_cfg->nr_worker_thread;
}
struct logger *stellar_get_logger(struct stellar *st)