Adjust thread index type to uint16 for future expansion & Organize stellar directory files

This commit is contained in:
luwenpeng
2024-04-25 16:48:50 +08:00
parent 17ca476c24
commit 611fda598f
22 changed files with 232 additions and 222 deletions

View File

@@ -90,7 +90,6 @@ uint16_t packet_get_payload_len(const struct packet *pkt);
int packet_need_drop(const struct packet *pkt);
void packet_set_drop(struct packet *pkt);
void packet_set_ctrl(struct packet *pkt);
int packet_is_ctrl(const struct packet *pkt);
void packet_set_direction(struct packet *pkt, int dir);

View File

@@ -60,6 +60,10 @@ enum session_stat
STAT_DUP_PKTS_BYPASS,
STAT_DUP_BYTES_BYPASS,
STAT_INJ_PKTS_FAIL, // TODO
STAT_INJ_PKTS_SUSS, // TODO
STAT_INJ_BYTES_SUSS, // TODO
// control packet
STAT_CTRL_PKTS_RX, // TODO
STAT_CTRL_BYTES_RX, // TODO

View File

@@ -8,7 +8,7 @@ extern "C"
#include <stdint.h>
uint8_t stellar_get_current_thread_index();
uint16_t stellar_get_current_thread_index();
#ifdef __cplusplus
}

View File

@@ -4,7 +4,6 @@ add_subdirectory(timestamp)
add_subdirectory(tuple)
add_subdirectory(packet)
add_subdirectory(packet_io)
add_subdirectory(thread_idx)
add_subdirectory(id_generator)
add_subdirectory(ip_reassembly)
add_subdirectory(tcp_reassembly)

View File

@@ -1,4 +1,4 @@
add_library(id_generator id_generator.cpp)
target_include_directories(id_generator PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(id_generator PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar)
target_link_libraries(id_generator log thread_idx)
target_link_libraries(id_generator log core)

View File

@@ -2,7 +2,7 @@
#include <string.h>
#include "macro.h"
#include "thread_idx.h"
#include "stellar_priv.h"
#include "id_generator.h"
struct id_generator

View File

@@ -69,6 +69,8 @@ void *packet_get_io_ctx(const struct packet *pkt);
int packet_is_fragment(const struct packet *pkt);
void packet_set_ctrl(struct packet *pkt);
struct packet *packet_new(uint16_t pkt_len);
void packet_free(struct packet *pkt);
struct packet *packet_dup(const struct packet *pkt);

View File

@@ -15,7 +15,7 @@
struct dumpfile_io
{
uint8_t nr_threads;
uint16_t nr_threads;
char directory[256];
pcap_t *pcap;
@@ -124,7 +124,7 @@ static void *dumpfile_thread(void *arg)
* Public API
******************************************************************************/
struct dumpfile_io *dumpfile_io_new(const char *directory, uint8_t nr_threads)
struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads)
{
pthread_t tid;
struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io));

View File

@@ -9,7 +9,7 @@ extern "C"
#include "packet_io.h"
struct dumpfile_io;
struct dumpfile_io *dumpfile_io_new(const char *directory, uint8_t nr_threads);
struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads);
void dumpfile_io_free(struct dumpfile_io *handle);
int dumpfile_io_init(struct dumpfile_io *handle, uint16_t thr_idx);

View File

@@ -44,12 +44,12 @@ static inline int is_keepalive_packet(const char *data, int len)
* Public API
******************************************************************************/
struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint8_t nr_threads)
struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_threads)
{
int opt = 1;
cpu_set_t coremask;
CPU_ZERO(&coremask);
for (uint8_t i = 0; i < nr_threads; i++)
for (uint16_t i = 0; i < nr_threads; i++)
{
CPU_SET(cpu_mask[i], &coremask);
}

View File

@@ -9,7 +9,7 @@ extern "C"
#include "packet_io.h"
struct marsio_io;
struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint8_t nr_threads);
struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_threads);
void marsio_io_free(struct marsio_io *handle);
int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx);

View File

@@ -63,7 +63,7 @@ struct packet_io_options
char app_symbol[64];
char dev_symbol[64];
uint8_t nr_threads;
uint16_t nr_threads;
uint16_t cpu_mask[MAX_THREAD_NUM];
};

View File

@@ -1,3 +1,3 @@
add_library(plugin_manager plugin_manager.cpp)
target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(plugin_manager session_manager thread_idx)
target_link_libraries(plugin_manager session_manager core)

View File

@@ -1,7 +1,7 @@
#include <assert.h>
#include "plugin_manager.h"
#include "session_priv.h"
#include "thread_idx.h"
#include "stellar_priv.h"
struct plugin_manager
{

View File

@@ -1,5 +1,7 @@
add_executable(stellar config.cpp stat.cpp stellar.cpp)
target_link_libraries(stellar timestamp plugin_manager session_manager ip_reassembly packet_io thread_idx)
target_link_libraries(stellar pthread fieldstat4 toml)
add_library(core config.cpp stat.cpp stellar.cpp)
target_link_libraries(core timestamp plugin_manager session_manager ip_reassembly packet_io)
target_link_libraries(core pthread fieldstat4 toml)
add_executable(stellar main.cpp)
target_link_libraries(stellar core)
install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)

View File

@@ -122,7 +122,7 @@ static int parse_packet_io_section(toml_table_t *root, struct packet_io_options
CONFIG_LOG_ERROR("config file missing packet_io->cpu_mask");
return -1;
}
for (uint8_t i = 0; i < opts->nr_threads; i++)
for (uint16_t i = 0; i < opts->nr_threads; i++)
{
ptr = toml_raw_at(mask_array, i);
if (ptr == NULL)
@@ -489,7 +489,7 @@ void stellar_print_config(struct stellar_config *config)
CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", io_opts->dev_symbol);
}
CONFIG_LOG_DEBUG("packet_io->nr_threads : %d", io_opts->nr_threads);
for (uint8_t i = 0; i < io_opts->nr_threads; i++)
for (uint16_t i = 0; i < io_opts->nr_threads; i++)
{
CONFIG_LOG_DEBUG("packet_io->cpu_mask[%03d] : %d", i, io_opts->cpu_mask[i]);
}

130
src/stellar/main.cpp Normal file
View File

@@ -0,0 +1,130 @@
#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <stdlib.h>
#include "logo.h"
#include "config.h"
#include "timestamp.h"
#include "id_generator.h"
#include "stellar_priv.h"
static const char *log_config_file = "./conf/log.toml";
static const char *stellar_config_file = "./conf/stellar.toml";
static void signal_handler(int signo)
{
if (signo == SIGINT)
{
STELLAR_LOG_STATE("SIGINT received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGQUIT)
{
STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGTERM)
{
STELLAR_LOG_STATE("SIGTERM received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGHUP)
{
STELLAR_LOG_STATE("SIGHUP received, reload log level !!!");
log_reload_level(log_config_file);
}
}
int main(int argc, char **argv)
{
timestamp_update();
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGHUP, signal_handler);
if (log_init(log_config_file) != 0)
{
STELLAR_LOG_ERROR("unable to init log");
goto error_out;
}
STELLAR_LOG_STATE("start stellar (version: %s)\n %s", __stellar_version, logo_str);
if (stellar_load_config(stellar_config_file, config) != 0)
{
STELLAR_LOG_ERROR("unable to load config file");
goto error_out;
}
stellar_print_config(config);
STELLAR_LOG_DEBUG("sizeof(struct session) = %lu bytes", sizeof(struct session));
if (id_generator_init(config->dev_opts.base, config->dev_opts.offset) != 0)
{
STELLAR_LOG_ERROR("unable to init id generator");
goto error_out;
}
runtime->stat = stellar_stat_new(config->io_opts.nr_threads);
if (runtime->stat == NULL)
{
STELLAR_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
runtime->plug_mgr = plugin_manager_new();
if (runtime->plug_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create plugin manager");
goto error_out;
}
runtime->packet_io = packet_io_new(&config->io_opts);
if (runtime->packet_io == NULL)
{
STELLAR_LOG_ERROR("unable to create packet io");
goto error_out;
}
if (stellar_thread_init(runtime, config) != 0)
{
STELLAR_LOG_ERROR("unable to init thread context");
goto error_out;
}
if (stellar_thread_run(runtime, config) != 0)
{
STELLAR_LOG_ERROR("unable to create worker thread");
goto error_out;
}
runtime->stat_last_output_ts = timestamp_get_msec();
while (!ATOMIC_READ(&runtime->need_exit))
{
timestamp_update();
if (timestamp_get_msec() - runtime->stat_last_output_ts > 2000)
{
runtime->stat_last_output_ts = timestamp_get_msec();
stellar_stat_output(runtime->stat);
}
usleep(1000); // 1ms
}
error_out:
stellar_thread_join(runtime, config);
stellar_thread_clean(runtime, config);
packet_io_free(runtime->packet_io);
plugin_manager_free(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();
return 0;
}

View File

@@ -1,84 +1,18 @@
#include <stdio.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/prctl.h>
#include "logo.h"
#include "stat.h"
#include "config.h"
#include "packet_priv.h"
#include "packet_io.h"
#include "timestamp.h"
#include "thread_idx.h"
#include "id_generator.h"
#include "ip_reassembly.h"
#include "session_manager.h"
#include "plugin_manager.h"
#include "stellar_priv.h"
#define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__)
struct stellar_thread
{
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
uint64_t timing_wheel_last_update_ts;
struct ip_reassembly *ip_mgr;
struct session_manager *sess_mgr;
};
struct stellar_runtime
{
uint64_t need_exit;
uint64_t stat_last_output_ts;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
struct stellar_runtime __runtime = {0};
struct stellar_runtime *runtime = &__runtime;
struct stellar_config __config = {0};
struct stellar_config *config = &__config;
static const char *log_config_file = "./conf/log.toml";
static const char *stellar_config_file = "./conf/stellar.toml";
static void signal_handler(int signo)
{
if (signo == SIGINT)
{
STELLAR_LOG_STATE("SIGINT received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGQUIT)
{
STELLAR_LOG_STATE("SIGQUIT received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGTERM)
{
STELLAR_LOG_STATE("SIGTERM received, notify threads to exit !!!");
ATOMIC_SET(&runtime->need_exit, 1);
}
if (signo == SIGHUP)
{
STELLAR_LOG_STATE("SIGHUP received, reload log level !!!");
log_reload_level(log_config_file);
}
}
static void update_session_stat(struct session *sess, struct packet *pkt)
{
if (sess)
@@ -275,12 +209,24 @@ static void *work_thread(void *arg)
return NULL;
}
static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads)
static thread_local uint16_t __thread_id = 0;
uint16_t stellar_get_current_thread_index()
{
return __thread_id;
}
void stellar_set_current_thread_index(uint16_t idx)
{
__thread_id = idx;
}
int stellar_thread_init(struct stellar_runtime *runtime, struct stellar_config *config)
{
uint64_t now = timestamp_get_msec();
for (uint8_t i = 0; i < nr_threads; i++)
for (uint16_t i = 0; i < config->io_opts.nr_threads; i++)
{
struct stellar_thread *thread = &ctx->threads[i];
struct stellar_thread *thread = &runtime->threads[i];
thread->idx = i;
thread->is_runing = 0;
thread->timing_wheel_last_update_ts = now;
@@ -301,11 +247,11 @@ static int stellar_thread_init(struct stellar_runtime *ctx, uint8_t nr_threads)
return 0;
}
static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads)
void stellar_thread_clean(struct stellar_runtime *runtime, struct stellar_config *config)
{
for (uint8_t i = 0; i < nr_threads; i++)
for (uint16_t i = 0; i < config->io_opts.nr_threads; i++)
{
struct stellar_thread *thread = &ctx->threads[i];
struct stellar_thread *thread = &runtime->threads[i];
if (ATOMIC_READ(&thread->is_runing) == 0)
{
STELLAR_LOG_STATE("wait worker thread %d free context", i);
@@ -315,11 +261,11 @@ static void stellar_thread_clean(struct stellar_runtime *ctx, uint8_t nr_threads
}
}
static int stellar_thread_run(struct stellar_runtime *ctx, uint8_t nr_threads)
int stellar_thread_run(struct stellar_runtime *runtime, struct stellar_config *config)
{
for (uint8_t i = 0; i < nr_threads; i++)
for (uint16_t i = 0; i < config->io_opts.nr_threads; i++)
{
struct stellar_thread *thread = &ctx->threads[i];
struct stellar_thread *thread = &runtime->threads[i];
if (pthread_create(&thread->tid, NULL, work_thread, (void *)thread) < 0)
{
STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno));
@@ -330,11 +276,11 @@ static int stellar_thread_run(struct stellar_runtime *ctx, uint8_t nr_threads)
return 0;
}
static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads)
void stellar_thread_join(struct stellar_runtime *runtime, struct stellar_config *config)
{
for (uint8_t i = 0; i < nr_threads; i++)
for (uint16_t i = 0; i < config->io_opts.nr_threads; i++)
{
struct stellar_thread *thread = &ctx->threads[i];
struct stellar_thread *thread = &runtime->threads[i];
while (ATOMIC_READ(&thread->is_runing) == 1)
{
STELLAR_LOG_STATE("wait worker thread %d stop", i);
@@ -342,92 +288,3 @@ static void stellar_thread_join(struct stellar_runtime *ctx, uint8_t nr_threads)
}
}
}
int main(int argc, char **argv)
{
uint8_t nr_threads = 0;
timestamp_update();
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGHUP, signal_handler);
if (log_init(log_config_file) != 0)
{
STELLAR_LOG_ERROR("unable to init log");
goto error_out;
}
STELLAR_LOG_STATE("start stellar (version: %s)\n %s", __stellar_version, logo_str);
if (stellar_load_config(stellar_config_file, config) != 0)
{
STELLAR_LOG_ERROR("unable to load config file");
goto error_out;
}
stellar_print_config(config);
STELLAR_LOG_DEBUG("sizeof(struct session) = %lu bytes", sizeof(struct session));
nr_threads = config->io_opts.nr_threads;
if (id_generator_init(config->dev_opts.base, config->dev_opts.offset) != 0)
{
STELLAR_LOG_ERROR("unable to init id generator");
goto error_out;
}
runtime->stat = stellar_stat_new(nr_threads);
if (runtime->stat == NULL)
{
STELLAR_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
runtime->plug_mgr = plugin_manager_new();
if (runtime->plug_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create plugin manager");
goto error_out;
}
runtime->packet_io = packet_io_new(&config->io_opts);
if (runtime->packet_io == NULL)
{
STELLAR_LOG_ERROR("unable to create packet io");
goto error_out;
}
if (stellar_thread_init(runtime, nr_threads) != 0)
{
STELLAR_LOG_ERROR("unable to init thread context");
goto error_out;
}
if (stellar_thread_run(runtime, nr_threads) != 0)
{
STELLAR_LOG_ERROR("unable to create worker thread");
goto error_out;
}
runtime->stat_last_output_ts = timestamp_get_msec();
while (!ATOMIC_READ(&runtime->need_exit))
{
timestamp_update();
if (timestamp_get_msec() - runtime->stat_last_output_ts > 2000)
{
runtime->stat_last_output_ts = timestamp_get_msec();
stellar_stat_output(runtime->stat);
}
usleep(1000); // 1ms
}
error_out:
stellar_thread_join(runtime, nr_threads);
stellar_thread_clean(runtime, nr_threads);
packet_io_free(runtime->packet_io);
plugin_manager_free(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();
return 0;
}

View File

@@ -0,0 +1,51 @@
#ifndef _STELLAR_PRIV_H
#define _STELLAR_PRIV_H
#ifdef __cplusplus
extern "C"
{
#endif
#include "stat.h"
#include "plugin_manager.h"
#include "stellar/stellar.h"
#define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__)
struct stellar_thread
{
pthread_t tid;
uint16_t idx;
uint64_t is_runing;
uint64_t timing_wheel_last_update_ts;
struct ip_reassembly *ip_mgr;
struct session_manager *sess_mgr;
};
struct stellar_runtime
{
uint64_t need_exit;
uint64_t stat_last_output_ts;
struct stellar_stat *stat;
struct packet_io *packet_io;
struct plugin_manager *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
extern struct stellar_runtime *runtime;
extern struct stellar_config *config;
void stellar_set_current_thread_index(uint16_t idx);
int stellar_thread_init(struct stellar_runtime *runtime, struct stellar_config *config);
void stellar_thread_clean(struct stellar_runtime *runtime, struct stellar_config *config);
int stellar_thread_run(struct stellar_runtime *runtime, struct stellar_config *config);
void stellar_thread_join(struct stellar_runtime *runtime, struct stellar_config *config);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -1,4 +0,0 @@
add_library(thread_idx thread_idx.cpp)
target_include_directories(thread_idx PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(thread_idx PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(thread_idx)

View File

@@ -1,13 +0,0 @@
#include "thread_idx.h"
static thread_local uint8_t __thread_id = 0;
uint8_t stellar_get_current_thread_index()
{
return __thread_id;
}
void stellar_set_current_thread_index(uint8_t idx)
{
__thread_id = idx;
}

View File

@@ -1,17 +0,0 @@
#ifndef _THREAD_IDX_H
#define _THREAD_IDX_H
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/stellar.h"
void stellar_set_current_thread_index(uint8_t idx);
#ifdef __cplusplus
}
#endif
#endif