Add packet IO module

* support marsio
    * support dumpfile ( 1 thread read dumpfile & N thread handle packet)
This commit is contained in:
luwenpeng
2024-02-28 16:30:03 +08:00
parent 2e748e0821
commit 7952ae7283
32 changed files with 1548 additions and 467 deletions

View File

@@ -1,13 +1,22 @@
[system]
app_symbol = stellar
dev_symbol = nf_0_fw
[device]
device_base = 1 # [0, 31]
device_offset = 2 # [0, 127]
[packet_io]
mode = dumpfile # dumpfile, marsio
dumpfile_dir = "/tmp/dumpfile/"
app_symbol = stellar
dev_symbol = nf_0_fw
nr_threads = 1 # [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
[ip_reassemble]
enable = 1
timeout = 10 # seconds
bucket_entries = 8
bucket_num = 4096
[session_manager]
# max session number
max_tcp_session_num = 100

View File

@@ -1,9 +1,11 @@
add_subdirectory(log)
add_subdirectory(file)
add_subdirectory(timestamp)
add_subdirectory(tuple)
add_subdirectory(packet)
add_subdirectory(ip_reassemble)
add_subdirectory(packet_io)
add_subdirectory(id_generator)
add_subdirectory(ip_reassemble)
add_subdirectory(dupkt_filter)
add_subdirectory(eviction_filter)
add_subdirectory(session)

View File

@@ -4,4 +4,4 @@
add_library(config config.cpp)
target_include_directories(config PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(config toml session_manager)
target_link_libraries(config toml packet_io ip_reassemble session_manager)

View File

@@ -7,79 +7,130 @@
// return 0: success
// retuun -1: failed
static int parse_system_section(struct config *cfg, toml_table_t *conf_file_handle)
static int parse_device_config(struct device_config *dev_cfg, toml_table_t *conf_file_handle)
{
const char *ptr;
toml_table_t *system_table;
toml_table_t *device_table;
device_table = toml_table_in(conf_file_handle, "device");
if (device_table == NULL)
{
CONFIG_LOG_ERROR("config file missing device section");
return -1;
}
ptr = toml_raw_in(device_table, "device_base");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing device.device_base");
return -1;
}
dev_cfg->device_base = atoi(ptr);
ptr = toml_raw_in(device_table, "device_offset");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing device.device_offset");
return -1;
}
dev_cfg->device_offset = atoi(ptr);
return 0;
}
// return 0: success
// retuun -1: failed
static int parse_packet_io_config(struct packet_io_config *pkt_io_cfg, toml_table_t *conf_file_handle)
{
const char *ptr;
toml_table_t *packet_io_table;
toml_array_t *mask_array;
system_table = toml_table_in(conf_file_handle, "system");
if (system_table == NULL)
packet_io_table = toml_table_in(conf_file_handle, "packet_io");
if (packet_io_table == NULL)
{
CONFIG_LOG_ERROR("config file missing system section");
CONFIG_LOG_ERROR("config file missing packet_io section");
return -1;
}
ptr = toml_raw_in(system_table, "app_symbol");
ptr = toml_raw_in(packet_io_table, "mode");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing system.app_symbol");
CONFIG_LOG_ERROR("config file missing packet_io.mode");
return -1;
}
strncpy(cfg->sys_cfg.app_symbol, ptr, sizeof(cfg->sys_cfg.app_symbol) - 1);
ptr = toml_raw_in(system_table, "dev_symbol");
if (ptr == NULL)
if (strcmp(ptr, "dumpfile") == 0)
{
CONFIG_LOG_ERROR("config file missing system.dev_symbol");
pkt_io_cfg->mode = PACKET_IO_DUMPFILE;
}
else if (strcmp(ptr, "marsio") == 0)
{
pkt_io_cfg->mode = PACKET_IO_MARSIO;
}
else
{
CONFIG_LOG_ERROR("config file invalid packet_io.mode %s, only support dumpfile and marsio", ptr);
return -1;
}
strncpy(cfg->sys_cfg.dev_symbol, ptr, sizeof(cfg->sys_cfg.dev_symbol) - 1);
ptr = toml_raw_in(system_table, "device_base");
if (ptr == NULL)
if (pkt_io_cfg->mode == PACKET_IO_DUMPFILE)
{
CONFIG_LOG_ERROR("config file missing system.device_base");
return -1;
ptr = toml_raw_in(packet_io_table, "dumpfile_dir");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing packet_io.dumpfile_dir");
return -1;
}
// skip ""
strncpy(pkt_io_cfg->dumpfile_dir, ptr + 1, strlen(ptr) - 2);
}
cfg->sys_cfg.device_base = atoi(ptr);
ptr = toml_raw_in(system_table, "device_offset");
if (ptr == NULL)
else
{
CONFIG_LOG_ERROR("config file missing system.device_offset");
return -1;
ptr = toml_raw_in(packet_io_table, "app_symbol");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing packet_io.app_symbol");
return -1;
}
strncpy(pkt_io_cfg->app_symbol, ptr, sizeof(pkt_io_cfg->app_symbol) - 1);
ptr = toml_raw_in(packet_io_table, "dev_symbol");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing packet_io.dev_symbol");
return -1;
}
strncpy(pkt_io_cfg->dev_symbol, ptr, sizeof(pkt_io_cfg->dev_symbol) - 1);
}
cfg->sys_cfg.device_offset = atoi(ptr);
ptr = toml_raw_in(system_table, "nr_threads");
ptr = toml_raw_in(packet_io_table, "nr_threads");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing system.nr_threads");
CONFIG_LOG_ERROR("config file missing packet_io.nr_threads");
return -1;
}
if (atoi(ptr) <= 0 || atoi(ptr) > MAX_THREAD_NUM)
{
CONFIG_LOG_ERROR("config file invalid system.nr_threads %d, range [1, %d]", atoi(ptr), MAX_THREAD_NUM);
CONFIG_LOG_ERROR("config file invalid packet_io.nr_threads %d, range [1, %d]", atoi(ptr), MAX_THREAD_NUM);
return -1;
}
cfg->sys_cfg.nr_threads = atoi(ptr);
pkt_io_cfg->nr_threads = atoi(ptr);
mask_array = toml_array_in(system_table, "cpu_mask");
mask_array = toml_array_in(packet_io_table, "cpu_mask");
if (mask_array == NULL)
{
CONFIG_LOG_ERROR("config file missing system.cpu_mask");
CONFIG_LOG_ERROR("config file missing packet_io.cpu_mask");
return -1;
}
for (uint8_t i = 0; i < cfg->sys_cfg.nr_threads; i++)
for (uint8_t i = 0; i < pkt_io_cfg->nr_threads; i++)
{
ptr = toml_raw_at(mask_array, i);
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing system.cpu_mask[%d]", i);
CONFIG_LOG_ERROR("config file missing packet_io.cpu_mask[%d]", i);
return -1;
}
cfg->sys_cfg.cpu_mask[i] = atoi(ptr);
pkt_io_cfg->cpu_mask[i] = atoi(ptr);
}
return 0;
@@ -87,7 +138,56 @@ static int parse_system_section(struct config *cfg, toml_table_t *conf_file_hand
// return 0: success
// retuun -1: failed
static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_file_handle)
static int parse_ip_reassemble_config(struct ip_reassemble_config *ip_reass_cfg, toml_table_t *conf_file_handle)
{
const char *ptr;
toml_table_t *ip_reass_table;
ip_reass_table = toml_table_in(conf_file_handle, "ip_reassemble");
if (ip_reass_table == NULL)
{
CONFIG_LOG_ERROR("config file missing ip_reassemble section");
return -1;
}
ptr = toml_raw_in(ip_reass_table, "enable");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing ip_reassemble.enable");
return -1;
}
ip_reass_cfg->enable = atoi(ptr);
ptr = toml_raw_in(ip_reass_table, "timeout");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing ip_reassemble.timeout");
return -1;
}
ip_reass_cfg->timeout = atoi(ptr);
ptr = toml_raw_in(ip_reass_table, "bucket_entries");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing ip_reassemble.bucket_entries");
return -1;
}
ip_reass_cfg->bucket_entries = atoi(ptr);
ptr = toml_raw_in(ip_reass_table, "bucket_num");
if (ptr == NULL)
{
CONFIG_LOG_ERROR("config file missing ip_reassemble.bucket_num");
return -1;
}
ip_reass_cfg->bucket_num = atoi(ptr);
return 0;
}
// return 0: success
// retuun -1: failed
static int parse_session_manager_section(struct session_manager_config *sess_mgr_cfg, toml_table_t *conf_file_handle)
{
const char *ptr;
toml_table_t *sess_mgr_table;
@@ -106,7 +206,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.max_tcp_session_num");
return -1;
}
cfg->sess_mgr_cfg.max_tcp_session_num = atoll(ptr);
sess_mgr_cfg->max_tcp_session_num = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "max_udp_session_num");
if (ptr == NULL)
@@ -114,7 +214,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.max_udp_session_num");
return -1;
}
cfg->sess_mgr_cfg.max_udp_session_num = atoll(ptr);
sess_mgr_cfg->max_udp_session_num = atoll(ptr);
// session overload (1: evict old session, 0: bypass new session)
ptr = toml_raw_in(sess_mgr_table, "tcp_overload_evict_old_sess");
@@ -123,7 +223,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_overload_evict_old_sess");
return -1;
}
cfg->sess_mgr_cfg.tcp_overload_evict_old_sess = atoi(ptr);
sess_mgr_cfg->tcp_overload_evict_old_sess = atoi(ptr);
ptr = toml_raw_in(sess_mgr_table, "udp_overload_evict_old_sess");
if (ptr == NULL)
@@ -131,7 +231,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_overload_evict_old_sess");
return -1;
}
cfg->sess_mgr_cfg.udp_overload_evict_old_sess = atoi(ptr);
sess_mgr_cfg->udp_overload_evict_old_sess = atoi(ptr);
// TCP timeout
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_init");
@@ -140,7 +240,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_init");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_init = atoll(ptr);
sess_mgr_cfg->tcp_timeout_init = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_handshake");
if (ptr == NULL)
@@ -148,7 +248,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_handshake");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_handshake = atoll(ptr);
sess_mgr_cfg->tcp_timeout_handshake = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_data");
if (ptr == NULL)
@@ -156,7 +256,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_data");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_data = atoll(ptr);
sess_mgr_cfg->tcp_timeout_data = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_half_closed");
if (ptr == NULL)
@@ -164,7 +264,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_half_closed");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_half_closed = atoll(ptr);
sess_mgr_cfg->tcp_timeout_half_closed = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_time_wait");
if (ptr == NULL)
@@ -172,7 +272,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_time_wait");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_time_wait = atoll(ptr);
sess_mgr_cfg->tcp_timeout_time_wait = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_timeout_discard");
if (ptr == NULL)
@@ -180,7 +280,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_timeout_discard");
return -1;
}
cfg->sess_mgr_cfg.tcp_timeout_discard = atoll(ptr);
sess_mgr_cfg->tcp_timeout_discard = atoll(ptr);
// UDP timeout
ptr = toml_raw_in(sess_mgr_table, "udp_timeout_data");
@@ -189,7 +289,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_timeout_data");
return -1;
}
cfg->sess_mgr_cfg.udp_timeout_data = atoll(ptr);
sess_mgr_cfg->udp_timeout_data = atoll(ptr);
// TCP duplicate packet filter
ptr = toml_raw_in(sess_mgr_table, "tcp_dupkt_filter_enable");
@@ -198,7 +298,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_dupkt_filter_enable");
return -1;
}
cfg->sess_mgr_cfg.tcp_dupkt_filter_enable = atoi(ptr);
sess_mgr_cfg->tcp_dupkt_filter_enable = atoi(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_dupkt_filter_capacity");
if (ptr == NULL)
@@ -206,7 +306,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_dupkt_filter_capacity");
return -1;
}
cfg->sess_mgr_cfg.tcp_dupkt_filter_capacity = atoll(ptr);
sess_mgr_cfg->tcp_dupkt_filter_capacity = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_dupkt_filter_timeout");
if (ptr == NULL)
@@ -214,7 +314,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_dupkt_filter_timeout");
return -1;
}
cfg->sess_mgr_cfg.tcp_dupkt_filter_timeout = atoll(ptr);
sess_mgr_cfg->tcp_dupkt_filter_timeout = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "tcp_dupkt_filter_error_rate");
if (ptr == NULL)
@@ -222,7 +322,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.tcp_dupkt_filter_error_rate");
return -1;
}
cfg->sess_mgr_cfg.tcp_dupkt_filter_error_rate = atof(ptr);
sess_mgr_cfg->tcp_dupkt_filter_error_rate = atof(ptr);
// UDP eviction filter
ptr = toml_raw_in(sess_mgr_table, "udp_eviction_filter_enable");
@@ -231,7 +331,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_eviction_filter_enable");
return -1;
}
cfg->sess_mgr_cfg.udp_eviction_filter_enable = atoi(ptr);
sess_mgr_cfg->udp_eviction_filter_enable = atoi(ptr);
ptr = toml_raw_in(sess_mgr_table, "udp_eviction_filter_capacity");
if (ptr == NULL)
@@ -239,7 +339,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_eviction_filter_capacity");
return -1;
}
cfg->sess_mgr_cfg.udp_eviction_filter_capacity = atoll(ptr);
sess_mgr_cfg->udp_eviction_filter_capacity = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "udp_eviction_filter_timeout");
if (ptr == NULL)
@@ -247,7 +347,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_eviction_filter_timeout");
return -1;
}
cfg->sess_mgr_cfg.udp_eviction_filter_timeout = atoll(ptr);
sess_mgr_cfg->udp_eviction_filter_timeout = atoll(ptr);
ptr = toml_raw_in(sess_mgr_table, "udp_eviction_filter_error_rate");
if (ptr == NULL)
@@ -255,7 +355,7 @@ static int parse_session_manager_section(struct config *cfg, toml_table_t *conf_
CONFIG_LOG_ERROR("config file missing session_manager.udp_eviction_filter_error_rate");
return -1;
}
cfg->sess_mgr_cfg.udp_eviction_filter_error_rate = atof(ptr);
sess_mgr_cfg->udp_eviction_filter_error_rate = atof(ptr);
return 0;
}
@@ -285,14 +385,22 @@ int config_load(struct config *cfg, const char *cfg_file)
goto error_out;
}
// system config
if (parse_system_section(cfg, conf_file_handle) != 0)
if (parse_device_config(&cfg->dev_cfg, conf_file_handle) != 0)
{
goto error_out;
}
// session manager config
if (parse_session_manager_section(cfg, conf_file_handle) != 0)
if (parse_packet_io_config(&cfg->pkt_io_cfg, conf_file_handle) != 0)
{
goto error_out;
}
if (parse_ip_reassemble_config(&cfg->ip_reass_cfg, conf_file_handle) != 0)
{
goto error_out;
}
if (parse_session_manager_section(&cfg->sess_mgr_cfg, conf_file_handle) != 0)
{
goto error_out;
}
@@ -320,38 +428,61 @@ void config_dump(struct config *cfg)
return;
}
CONFIG_LOG_DEBUG("system.app_symbol : %s", cfg->sys_cfg.app_symbol);
CONFIG_LOG_DEBUG("system.dev_symbol : %s", cfg->sys_cfg.dev_symbol);
CONFIG_LOG_DEBUG("system.device_base : %d", cfg->sys_cfg.device_base);
CONFIG_LOG_DEBUG("system.device_offset : %d", cfg->sys_cfg.device_offset);
CONFIG_LOG_DEBUG("system.nr_threads : %d", cfg->sys_cfg.nr_threads);
for (uint8_t i = 0; i < cfg->sys_cfg.nr_threads; i++)
struct device_config *dev_cfg = &cfg->dev_cfg;
struct packet_io_config *pkt_io_cfg = &cfg->pkt_io_cfg;
struct ip_reassemble_config *ip_reass_cfg = &cfg->ip_reass_cfg;
struct session_manager_config *sess_mgr_cfg = &cfg->sess_mgr_cfg;
// device config
CONFIG_LOG_DEBUG("device->device_base : %d", dev_cfg->device_base);
CONFIG_LOG_DEBUG("device->device_offset : %d", dev_cfg->device_offset);
// packet io config
CONFIG_LOG_DEBUG("packet_io->mode : %s", pkt_io_cfg->mode == PACKET_IO_DUMPFILE ? "dumpfile" : "marsio");
if (pkt_io_cfg->mode == PACKET_IO_DUMPFILE)
{
CONFIG_LOG_DEBUG("system.cpu_mask[%d] : %d", i, cfg->sys_cfg.cpu_mask[i]);
CONFIG_LOG_DEBUG("packet_io->dumpfile_dir : %s", pkt_io_cfg->dumpfile_dir);
}
else
{
CONFIG_LOG_DEBUG("packet_io->app_symbol : %s", pkt_io_cfg->app_symbol);
CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", pkt_io_cfg->dev_symbol);
}
CONFIG_LOG_DEBUG("packet_io->nr_threads : %d", pkt_io_cfg->nr_threads);
for (uint8_t i = 0; i < pkt_io_cfg->nr_threads; i++)
{
CONFIG_LOG_DEBUG("packet_io->cpu_mask[%03d] : %d", i, pkt_io_cfg->cpu_mask[i]);
}
CONFIG_LOG_DEBUG("session_manager.max_tcp_session_num : %ld", cfg->sess_mgr_cfg.max_tcp_session_num);
CONFIG_LOG_DEBUG("session_manager.max_udp_session_num : %ld", cfg->sess_mgr_cfg.max_udp_session_num);
// ip reassemble config
CONFIG_LOG_DEBUG("ip_reassemble->enable : %d", ip_reass_cfg->enable);
CONFIG_LOG_DEBUG("ip_reassemble->timeout : %d", ip_reass_cfg->timeout);
CONFIG_LOG_DEBUG("ip_reassemble->bucket_entries : %d", ip_reass_cfg->bucket_entries);
CONFIG_LOG_DEBUG("ip_reassemble->bucket_num : %d", ip_reass_cfg->bucket_num);
CONFIG_LOG_DEBUG("session_manager.tcp_overload_evict_old_sess : %d", cfg->sess_mgr_cfg.tcp_overload_evict_old_sess);
CONFIG_LOG_DEBUG("session_manager.udp_overload_evict_old_sess : %d", cfg->sess_mgr_cfg.udp_overload_evict_old_sess);
// session manager config
CONFIG_LOG_DEBUG("session_manager->max_tcp_session_num : %ld", sess_mgr_cfg->max_tcp_session_num);
CONFIG_LOG_DEBUG("session_manager->max_udp_session_num : %ld", sess_mgr_cfg->max_udp_session_num);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_init : %ld", cfg->sess_mgr_cfg.tcp_timeout_init);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_handshake : %ld", cfg->sess_mgr_cfg.tcp_timeout_handshake);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_data : %ld", cfg->sess_mgr_cfg.tcp_timeout_data);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_half_closed : %ld", cfg->sess_mgr_cfg.tcp_timeout_half_closed);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_time_wait : %ld", cfg->sess_mgr_cfg.tcp_timeout_time_wait);
CONFIG_LOG_DEBUG("session_manager.tcp_timeout_discard : %ld", cfg->sess_mgr_cfg.tcp_timeout_discard);
CONFIG_LOG_DEBUG("session_manager->tcp_overload_evict_old_sess : %d", sess_mgr_cfg->tcp_overload_evict_old_sess);
CONFIG_LOG_DEBUG("session_manager->udp_overload_evict_old_sess : %d", sess_mgr_cfg->udp_overload_evict_old_sess);
CONFIG_LOG_DEBUG("session_manager.udp_timeout_data : %ld", cfg->sess_mgr_cfg.udp_timeout_data);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_init : %ld", sess_mgr_cfg->tcp_timeout_init);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_handshake : %ld", sess_mgr_cfg->tcp_timeout_handshake);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_data : %ld", sess_mgr_cfg->tcp_timeout_data);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_half_closed : %ld", sess_mgr_cfg->tcp_timeout_half_closed);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_time_wait : %ld", sess_mgr_cfg->tcp_timeout_time_wait);
CONFIG_LOG_DEBUG("session_manager->tcp_timeout_discard : %ld", sess_mgr_cfg->tcp_timeout_discard);
CONFIG_LOG_DEBUG("session_manager.tcp_dupkt_filter_enable : %d", cfg->sess_mgr_cfg.tcp_dupkt_filter_enable);
CONFIG_LOG_DEBUG("session_manager.tcp_dupkt_filter_capacity : %ld", cfg->sess_mgr_cfg.tcp_dupkt_filter_capacity);
CONFIG_LOG_DEBUG("session_manager.tcp_dupkt_filter_timeout : %ld", cfg->sess_mgr_cfg.tcp_dupkt_filter_timeout);
CONFIG_LOG_DEBUG("session_manager.tcp_dupkt_filter_error_rate : %f", cfg->sess_mgr_cfg.tcp_dupkt_filter_error_rate);
CONFIG_LOG_DEBUG("session_manager->udp_timeout_data : %ld", sess_mgr_cfg->udp_timeout_data);
CONFIG_LOG_DEBUG("session_manager.udp_eviction_filter_enable : %d", cfg->sess_mgr_cfg.udp_eviction_filter_enable);
CONFIG_LOG_DEBUG("session_manager.udp_eviction_filter_capacity : %ld", cfg->sess_mgr_cfg.udp_eviction_filter_capacity);
CONFIG_LOG_DEBUG("session_manager.udp_eviction_filter_timeout : %ld", cfg->sess_mgr_cfg.udp_eviction_filter_timeout);
CONFIG_LOG_DEBUG("session_manager.udp_eviction_filter_error_rate : %f", cfg->sess_mgr_cfg.udp_eviction_filter_error_rate);
CONFIG_LOG_DEBUG("session_manager->tcp_dupkt_filter_enable : %d", sess_mgr_cfg->tcp_dupkt_filter_enable);
CONFIG_LOG_DEBUG("session_manager->tcp_dupkt_filter_capacity : %ld", sess_mgr_cfg->tcp_dupkt_filter_capacity);
CONFIG_LOG_DEBUG("session_manager->tcp_dupkt_filter_timeout : %ld", sess_mgr_cfg->tcp_dupkt_filter_timeout);
CONFIG_LOG_DEBUG("session_manager->tcp_dupkt_filter_error_rate : %f", sess_mgr_cfg->tcp_dupkt_filter_error_rate);
CONFIG_LOG_DEBUG("session_manager->udp_eviction_filter_enable : %d", sess_mgr_cfg->udp_eviction_filter_enable);
CONFIG_LOG_DEBUG("session_manager->udp_eviction_filter_capacity : %ld", sess_mgr_cfg->udp_eviction_filter_capacity);
CONFIG_LOG_DEBUG("session_manager->udp_eviction_filter_timeout : %ld", sess_mgr_cfg->udp_eviction_filter_timeout);
CONFIG_LOG_DEBUG("session_manager->udp_eviction_filter_error_rate : %f", sess_mgr_cfg->udp_eviction_filter_error_rate);
}

View File

@@ -6,8 +6,10 @@ extern "C"
{
#endif
#include "session_manager.h"
#include "stellar.h"
#include "packet_io.h"
#include "ip_reassemble.h"
#include "session_manager.h"
#define CONFIG_LOG_ERROR(format, ...) LOG_ERROR("config", format, ##__VA_ARGS__)
#ifndef CONFIG_LOG_ERROR
@@ -20,21 +22,17 @@ extern "C"
fprintf(stderr, "DEBUG (config), " format "\n", ##__VA_ARGS__);
#endif
struct system_config
struct device_config
{
char app_symbol[64];
char dev_symbol[64];
uint8_t device_base;
uint8_t device_offset;
uint8_t nr_threads;
uint16_t cpu_mask[MAX_THREAD_NUM];
};
struct config
{
struct system_config sys_cfg;
struct device_config dev_cfg;
struct packet_io_config pkt_io_cfg;
struct ip_reassemble_config ip_reass_cfg;
struct session_manager_config sess_mgr_cfg;
};

7
src/file/CMakeLists.txt Normal file
View File

@@ -0,0 +1,7 @@
###############################################################################
# file
###############################################################################
add_library(file file_scan.cpp)
target_include_directories(file PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(file log)

69
src/file/file_scan.cpp Normal file
View File

@@ -0,0 +1,69 @@
#include <errno.h>
#include <dirent.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "file_scan.h"
int file_scan(const char *dir, file_handle *cb, void *arg)
{
struct stat statbuf;
struct dirent *entry;
DIR *dp = opendir(dir);
if (NULL == dp)
{
FILE_SCAN_LOG_ERROR("opendir %s failed, %s", dir, strerror(errno));
return -1;
}
if (chdir(dir) == -1)
{
FILE_SCAN_LOG_ERROR("chdir %s failed, %s", dir, strerror(errno));
goto error_out;
}
while ((entry = readdir(dp)))
{
if (lstat(entry->d_name, &statbuf) == -1)
{
FILE_SCAN_LOG_ERROR("lstat %s failed, %s", entry->d_name, strerror(errno));
goto error_out;
}
if (S_IFDIR & statbuf.st_mode)
{
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
{
continue;
}
if (file_scan(entry->d_name, cb, arg) == -1)
{
goto error_out;
}
}
else
{
if (cb(entry->d_name, arg) == -1)
{
goto error_out;
}
}
}
if (chdir("..") == -1)
{
FILE_SCAN_LOG_ERROR("chdir .. failed, %s", strerror(errno));
goto error_out;
}
closedir(dp);
return 0;
error_out:
closedir(dp);
return -1;
}

21
src/file/file_scan.h Normal file
View File

@@ -0,0 +1,21 @@
#ifndef _FILE_SCAN_H
#define _FILE_SCAN_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include "log.h"
#define FILE_SCAN_LOG_ERROR(format, ...) LOG_ERROR("file_scan", format, ##__VA_ARGS__)
#define FILE_SCAN_LOG_DEBUG(format, ...) LOG_DEBUG("file_scan", format, ##__VA_ARGS__)
typedef int file_handle(const char *file, void *arg);
int file_scan(const char *dir, file_handle *cb, void *arg);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -9,7 +9,7 @@
#include "checksum.h"
#include "ipv4_utils.h"
#include "ipv6_utils.h"
#include "packet_helpers.h"
#include "packet_utils.h"
#include "ip_reassemble.h"
#define IPV4_KEYLEN 1
@@ -636,15 +636,15 @@ static struct packet *ip_frag_reassemble(struct ip_reassemble_manager *mgr, stru
// calculate the length of the reassembled packet
uint32_t packet_len = flow->expected_total_size + flow->hdr.hdr_len;
char *buff = (char *)calloc(1, packet_len + sizeof(struct packet));
if (buff == NULL)
struct packet *pkt = packet_new(packet_len);
if (pkt == NULL)
{
IP_REASSEMBLE_ERROR("unable to allocate memory");
return NULL;
}
char *ptr = buff + sizeof(struct packet);
char *end = ptr + packet_len;
char *ptr = (char *)packet_get_data(pkt);
char *end = ptr + packet_get_len(pkt);
// copy last frag
if (last->len > end - ptr)
@@ -732,17 +732,19 @@ static struct packet *ip_frag_reassemble(struct ip_reassemble_manager *mgr, stru
}
// create a new packet
packet_parse((struct packet *)buff, buff + sizeof(struct packet), packet_len);
return (struct packet *)buff;
packet_parse(pkt, ptr, packet_len);
packet_set_action(pkt, PACKET_ACTION_DROP);
return pkt;
error_out_invalid_length:
ip_reassemble_stat_inc(mgr, fail_invalid_length, &flow->key);
free(buff);
packet_free(pkt);
return NULL;
error_out_overlap:
ip_reassemble_stat_inc(mgr, fail_overlap, &flow->key);
free(buff);
packet_free(pkt);
return NULL;
}

View File

@@ -13,7 +13,7 @@ extern "C"
#include "tcp_utils.h"
#include "ipv4_utils.h"
#include "ipv6_utils.h"
#include "packet_helpers.h"
#include "packet_utils.h"
#include "ip_reassemble.h"
static inline void packet_set_ipv4_src_addr(struct packet *pkt, uint32_t saddr)

View File

@@ -2,7 +2,7 @@
# packet
###############################################################################
add_library(packet packet.cpp)
add_library(packet packet.cpp packet_utils.cpp)
target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash)
target_link_libraries(packet tuple log)

View File

@@ -1115,7 +1115,6 @@ const char *packet_parse(struct packet *handler, const char *data, uint16_t len)
handler->data_ptr = data;
handler->data_len = len;
handler->domain = 0;
handler->user_data = NULL;
// TESTED
return parse_ether(handler, data, len);

View File

@@ -14,7 +14,7 @@ extern "C"
#define PACKET_MAX_LAYERS 16
#define PACKET_LOG_ERROR(format, ...) LOG_ERROR("packet", format, ##__VA_ARGS__)
#define PACKET_LOG_DEBUG(format, ...) void(0)
//#define PACKET_LOG_DEBUG(format, ...) LOG_DEBUG("packet", format, ##__VA_ARGS__)
// #define PACKET_LOG_DEBUG(format, ...) LOG_DEBUG("packet", format, ##__VA_ARGS__)
enum layer_type
{
@@ -71,6 +71,9 @@ struct layer_record
uint16_t pld_len; // payload length
};
#define MAX_SID_NUM 8
#define MAX_ROUTE_LEN 64
struct packet
{
struct layer_record layers[PACKET_MAX_LAYERS];
@@ -81,8 +84,21 @@ struct packet
const char *data_ptr;
uint16_t data_len;
uint64_t domain;
bool need_free;
const void *user_data;
// metadata
void *io_ctx;
uint16_t sid_list[MAX_SID_NUM];
uint16_t sid_used;
char route_ctx[MAX_ROUTE_LEN];
uint16_t route_len;
uint64_t session_id;
uint8_t direction;
uint8_t type;
uint8_t action;
};
// return innermost payload

View File

@@ -1,158 +0,0 @@
#ifndef _PACKET_HELPERS_H
#define _PACKET_HELPERS_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <stdlib.h>
#include <string.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include "packet.h"
#include "ipv4_utils.h"
#include "ipv6_utils.h"
struct metadata
{
char data[64];
// TODO
};
/******************************************************************************
* metadata
******************************************************************************/
static inline struct metadata *metadata_dup(const struct metadata *metadata)
{
if (metadata == NULL)
{
return NULL;
}
struct metadata *metadata_dup = (struct metadata *)calloc(1, sizeof(struct metadata));
if (metadata_dup == NULL)
{
return NULL;
}
memcpy(metadata_dup, metadata, sizeof(struct metadata));
return metadata_dup;
}
static inline void metadata_free(struct metadata *metadata)
{
if (metadata)
{
free(metadata);
metadata = NULL;
}
}
static inline void packet_set0_metadata(struct packet *pkt, const struct metadata *metadata)
{
pkt->user_data = (const void *)metadata;
}
static inline const struct metadata *packet_get0_metadata(const struct packet *pkt)
{
return (const struct metadata *)pkt->user_data;
}
/******************************************************************************
* packet
******************************************************************************/
static inline bool paket_is_fragment(const struct packet *pkt)
{
for (int8_t i = 0; i < pkt->layers_used; i++)
{
if (pkt->layers[i].type == LAYER_TYPE_IPV4)
{
struct ip *ip_hdr = (struct ip *)pkt->layers[i].hdr_ptr;
if (ipv4_hdr_get_mf_flag(ip_hdr) || ipv4_hdr_get_frag_offset(ip_hdr))
{
return true;
}
}
if (pkt->layers[i].type == LAYER_TYPE_IPV6)
{
const struct ip6_hdr *ip6_hdr = (const struct ip6_hdr *)pkt->layers[i].hdr_ptr;
if (ipv6_hdr_get_next_header(ip6_hdr) == IPPROTO_FRAGMENT)
{
return true;
}
}
}
return false;
}
static inline struct packet *packet_dup(const struct packet *pkt)
{
if (pkt == NULL)
{
return NULL;
}
struct packet *pkt_dup = (struct packet *)calloc(1, sizeof(struct packet) + pkt->data_len);
if (pkt_dup == NULL)
{
return NULL;
}
memcpy(pkt_dup, pkt, sizeof(struct packet));
pkt_dup->user_data = NULL;
pkt_dup->data_ptr = (const char *)pkt_dup + sizeof(struct packet);
if (pkt_dup->data_ptr == NULL)
{
free(pkt_dup);
return NULL;
}
memcpy((char *)pkt_dup->data_ptr, pkt->data_ptr, pkt->data_len);
for (int8_t i = 0; i < pkt->layers_used; i++)
{
pkt_dup->layers[i].hdr_ptr = pkt_dup->data_ptr + pkt->layers[i].hdr_offset;
pkt_dup->layers[i].pld_ptr = pkt_dup->data_ptr + pkt->layers[i].hdr_offset + pkt->layers[i].hdr_len;
}
if (pkt->frag_layer)
{
pkt_dup->frag_layer = &pkt_dup->layers[pkt->frag_layer - pkt->layers];
}
packet_set0_metadata(pkt_dup, metadata_dup(packet_get0_metadata(pkt)));
return pkt_dup;
}
static inline void packet_free(struct packet *pkt)
{
if (pkt)
{
metadata_free((struct metadata *)packet_get0_metadata(pkt));
packet_set0_metadata(pkt, NULL);
free(pkt);
pkt = NULL;
}
}
static inline uint16_t packet_get_len(const struct packet *pkt)
{
return pkt->data_len;
}
static inline const char *packet_get_data(const struct packet *pkt)
{
return pkt->data_ptr;
}
#ifdef __cpluscplus
}
#endif
#endif

202
src/packet/packet_utils.cpp Normal file
View File

@@ -0,0 +1,202 @@
#include "packet_utils.h"
void packet_set_io_ctx(struct packet *pkt, void *ctx)
{
pkt->io_ctx = ctx;
}
void *packet_get_io_ctx(const struct packet *pkt)
{
return pkt->io_ctx;
}
// return 0 success; return -1 failed
int packet_set_sid(struct packet *pkt, uint16_t *sid, int num)
{
if (num > MAX_SID_NUM)
{
return -1;
}
pkt->sid_used = num;
memcpy(pkt->sid_list, sid, num * sizeof(uint16_t));
return 0;
}
// return number of sid
int packet_get_sid(const struct packet *pkt, uint16_t *sid, int size)
{
if (size < pkt->sid_used)
{
return 0;
}
memcpy(sid, pkt->sid_list, pkt->sid_used * sizeof(uint16_t));
return pkt->sid_used;
}
// return 0 success; return -1 failed
int packet_prepend_sid(struct packet *pkt, uint16_t sid)
{
if (pkt->sid_used >= MAX_SID_NUM)
{
return -1;
}
memmove(pkt->sid_list + 1, pkt->sid_list, pkt->sid_used * sizeof(uint16_t));
pkt->sid_list[0] = sid;
pkt->sid_used++;
return 0;
}
// return 0 success; return -1 failed
int packet_append_sid(struct packet *pkt, uint16_t sid)
{
if (pkt->sid_used >= MAX_SID_NUM)
{
return -1;
}
pkt->sid_list[pkt->sid_used] = sid;
pkt->sid_used++;
return 0;
}
// return 0 success; return -1 failed
int packet_set_route_ctx(struct packet *pkt, const char *route, int len)
{
if (len > MAX_ROUTE_LEN)
{
return -1;
}
memcpy(pkt->route_ctx, route, len);
pkt->route_len = len;
return 0;
}
// return len of route ctx
int packet_get_route_ctx(const struct packet *pkt, char *buff, int size)
{
if (pkt->route_len > size)
{
return 0;
}
memcpy(buff, pkt->route_ctx, pkt->route_len);
return pkt->route_len;
}
void packet_set_session_id(struct packet *pkt, uint64_t session_id)
{
pkt->session_id = session_id;
}
uint64_t packet_get_session_id(const struct packet *pkt)
{
return pkt->session_id;
}
void packet_set_direction(struct packet *pkt, uint8_t direction)
{
pkt->direction = direction;
}
uint8_t packet_get_direction(const struct packet *pkt)
{
return pkt->direction;
}
void packet_set_type(struct packet *pkt, uint8_t type)
{
pkt->type = type;
}
uint8_t packet_get_type(const struct packet *pkt)
{
return pkt->type;
}
void packet_set_action(struct packet *pkt, uint8_t action)
{
pkt->action = action;
}
uint8_t packet_get_action(const struct packet *pkt)
{
return pkt->action;
}
struct packet *packet_new(uint16_t len)
{
struct packet *pkt = (struct packet *)calloc(1, sizeof(struct packet) + len);
if (pkt == NULL)
{
return NULL;
}
pkt->data_len = len;
pkt->data_ptr = (const char *)pkt + sizeof(struct packet);
pkt->need_free = true;
return pkt;
}
void packet_free(struct packet *pkt)
{
if (pkt && pkt->need_free)
{
free(pkt);
pkt = NULL;
}
}
struct packet *packet_dup(const struct packet *pkt)
{
if (pkt == NULL)
{
return NULL;
}
struct packet *pkt_dup = packet_new(pkt->data_len);
if (pkt_dup == NULL)
{
return NULL;
}
memcpy(pkt_dup, pkt, sizeof(struct packet));
memcpy((char *)pkt_dup->data_ptr, pkt->data_ptr, pkt->data_len);
pkt_dup->need_free = true;
// update layers
for (int8_t i = 0; i < pkt->layers_used; i++)
{
pkt_dup->layers[i].hdr_ptr = pkt_dup->data_ptr + pkt->layers[i].hdr_offset;
pkt_dup->layers[i].pld_ptr = pkt_dup->data_ptr + pkt->layers[i].hdr_offset + pkt->layers[i].hdr_len;
}
// update frag_layer
if (pkt->frag_layer)
{
pkt_dup->frag_layer = &pkt_dup->layers[pkt->frag_layer - pkt->layers];
}
return pkt_dup;
}
const char *packet_get_data(const struct packet *pkt)
{
return pkt->data_ptr;
}
uint16_t packet_get_len(const struct packet *pkt)
{
return pkt->data_len;
}
bool paket_is_fragment(const struct packet *pkt)
{
if (pkt->frag_layer)
{
return true;
}
else
{
return false;
}
}

59
src/packet/packet_utils.h Normal file
View File

@@ -0,0 +1,59 @@
#ifndef _PACKET_UTILS_H
#define _PACKET_UTILS_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <stdlib.h>
#include <string.h>
#include "packet.h"
void packet_set_io_ctx(struct packet *pkt, void *ctx);
void *packet_get_io_ctx(const struct packet *pkt);
int packet_set_sid(struct packet *pkt, uint16_t *sid, int num); // return 0 success; return -1 failed
int packet_get_sid(const struct packet *pkt, uint16_t *sid, int size); // return number of sid
int packet_prepend_sid(struct packet *pkt, uint16_t sid); // return 0 success; return -1 failed
int packet_append_sid(struct packet *pkt, uint16_t sid); // return 0 success; return -1 failed
int packet_set_route_ctx(struct packet *pkt, const char *route, int len); // return 0 success; return -1 failed
int packet_get_route_ctx(const struct packet *pkt, char *buff, int size); // return len of route ctx
void packet_set_session_id(struct packet *pkt, uint64_t session_id);
uint64_t packet_get_session_id(const struct packet *pkt);
#define PACKET_DIRECTION_I2E 0
#define PACKET_DIRECTION_E2I 1
void packet_set_direction(struct packet *pkt, uint8_t direction);
uint8_t packet_get_direction(const struct packet *pkt);
#define PACKET_TYPE_DATA 0
#define PACKET_TYPE_CTRL 1
void packet_set_type(struct packet *pkt, uint8_t type);
uint8_t packet_get_type(const struct packet *pkt);
#define PACKET_ACTION_FORWARD 0
#define PACKET_ACTION_DROP 1
void packet_set_action(struct packet *pkt, uint8_t action);
uint8_t packet_get_action(const struct packet *pkt);
struct packet *packet_new(uint16_t len);
void packet_free(struct packet *pkt);
struct packet *packet_dup(const struct packet *pkt);
const char *packet_get_data(const struct packet *pkt);
uint16_t packet_get_len(const struct packet *pkt);
bool paket_is_fragment(const struct packet *pkt);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -17,8 +17,8 @@ target_link_libraries(gtest_ipv4_utils packet gtest)
add_executable(gtest_ipv6_utils gtest_ipv6_utils.cpp)
target_link_libraries(gtest_ipv6_utils packet gtest)
add_executable(gtest_packet_helpers gtest_packet_helpers.cpp)
target_link_libraries(gtest_packet_helpers packet gtest)
add_executable(gtest_packet_utils gtest_packet_utils.cpp)
target_link_libraries(gtest_packet_utils packet gtest)
include(GoogleTest)
gtest_discover_tests(gtest_packet)
@@ -26,4 +26,4 @@ gtest_discover_tests(gtest_udp_utils)
gtest_discover_tests(gtest_tcp_utils)
gtest_discover_tests(gtest_ipv4_utils)
gtest_discover_tests(gtest_ipv6_utils)
gtest_discover_tests(gtest_packet_helpers)
gtest_discover_tests(gtest_packet_utils)

View File

@@ -1,6 +1,6 @@
#include <gtest/gtest.h>
#include "packet_helpers.h"
#include "packet_utils.h"
/******************************************************************************
* [Protocols in frame: eth:ethertype:ip:data]

View File

@@ -0,0 +1,8 @@
###############################################################################
# packet_io
###############################################################################
add_library(packet_io packet_queue.cpp packet_io.cpp packet_io_dumpfile.cpp packet_io_marsio.cpp)
target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(packet_io PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar)
target_link_libraries(packet_io mrzcpd packet file pcap)

120
src/packet_io/packet_io.cpp Normal file
View File

@@ -0,0 +1,120 @@
#include <stdlib.h>
#include <string.h>
#include "packet_io.h"
#include "packet_io_marsio.h"
#include "packet_io_dumpfile.h"
typedef void *on_create(void *config);
typedef void on_destroy(void *handle);
typedef void *on_stat(void *handle);
typedef int on_init(void *handle, uint16_t thread_id);
typedef int on_recv(void *handle, uint16_t thread_id, struct packet **pkt);
typedef void on_send(void *handle, uint16_t thread_id, struct packet *pkt);
struct packet_io
{
void *handle;
on_create *create;
on_destroy *destroy;
on_stat *stat;
on_init *init;
on_recv *recv;
on_send *send;
};
struct packet_io *packet_io_create(struct packet_io_config *config)
{
struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
if (handle == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc packet io");
return NULL;
}
struct packet_io_marsio_confg marsio_config;
strncpy(marsio_config.app_symbol, config->app_symbol, sizeof(marsio_config.app_symbol));
strncpy(marsio_config.dev_symbol, config->dev_symbol, sizeof(marsio_config.dev_symbol));
memcpy(marsio_config.cpu_mask, config->cpu_mask, sizeof(marsio_config.cpu_mask));
marsio_config.nr_threads = config->nr_threads;
struct packet_io_dumpfile_confg dumpfile_config;
strncpy(dumpfile_config.dumpfile_dir, config->dumpfile_dir, sizeof(dumpfile_config.dumpfile_dir));
dumpfile_config.nr_threads = config->nr_threads;
void *_config = NULL;
if (config->mode == PACKET_IO_MARSIO)
{
_config = &marsio_config;
handle->create = (on_create *)packet_io_marsio_create;
handle->destroy = (on_destroy *)packet_io_marsio_destory;
handle->stat = (on_stat *)packet_io_marsio_stat;
handle->init = (on_init *)packet_io_marsio_init;
handle->recv = (on_recv *)packet_io_marsio_recv;
handle->send = (on_send *)packet_io_marsio_send;
}
else
{
_config = &dumpfile_config;
handle->create = (on_create *)packet_io_dumpfile_create;
handle->destroy = (on_destroy *)packet_io_dumpfile_destory;
handle->stat = (on_stat *)packet_io_dumpfile_stat;
handle->init = (on_init *)packet_io_dumpfile_init;
handle->recv = (on_recv *)packet_io_dumpfile_recv;
handle->send = (on_send *)packet_io_dumpfile_send;
}
handle->handle = handle->create(_config);
if (handle->handle == NULL)
{
goto error_out;
}
return handle;
error_out:
packet_io_destroy(handle);
return NULL;
}
void packet_io_destroy(struct packet_io *handle)
{
if (handle)
{
handle->destroy(handle->handle);
free(handle);
handle = NULL;
}
}
struct packet_io_stat *packet_io_get_stat(struct packet_io *handle)
{
return (struct packet_io_stat *)handle->stat(handle->handle);
}
void packet_io_print_stat(struct packet_io *handle)
{
struct packet_io_stat *stat = packet_io_get_stat(handle);
PACKET_IO_LOG_DEBUG("rx_pkts : %lu, rx_bytes : %lu", stat->rx_pkts, stat->rx_bytes);
PACKET_IO_LOG_DEBUG("tx_pkts : %lu, tx_bytes : %lu", stat->tx_pkts, stat->tx_bytes);
PACKET_IO_LOG_DEBUG("drop_pkts : %lu, drop_bytes : %lu", stat->drop_pkts, stat->drop_bytes);
PACKET_IO_LOG_DEBUG("inject_pkts : %lu, inject_bytes : %lu", stat->inject_pkts, stat->inject_bytes);
PACKET_IO_LOG_DEBUG("keepalive_pkts : %lu, keepalive_bytes : %lu", stat->keepalive_pkts, stat->keepalive_bytes);
}
int packet_io_init(struct packet_io *handle, uint16_t thread_id)
{
return handle->init(handle->handle, thread_id);
}
int packet_io_recv(struct packet_io *handle, uint16_t thread_id, struct packet **pkt)
{
return handle->recv(handle->handle, thread_id, pkt);
}
void packet_io_send(struct packet_io *handle, uint16_t thread_id, struct packet *pkt)
{
handle->send(handle->handle, thread_id, pkt);
}

70
src/packet_io/packet_io.h Normal file
View File

@@ -0,0 +1,70 @@
#ifndef _PACKET_IO_H
#define _PACKET_IO_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include "packet.h"
#include "stellar.h"
#define PACKET_IO_LOG_STATE(format, ...) LOG_STATE("packet_io", format, ##__VA_ARGS__)
#define PACKET_IO_LOG_ERROR(format, ...) LOG_ERROR("packet_io", format, ##__VA_ARGS__)
#define PACKET_IO_LOG_DEBUG(format, ...) LOG_DEBUG("packet_io", format, ##__VA_ARGS__)
struct packet_io_stat
{
uint64_t rx_pkts;
uint64_t tx_pkts;
uint64_t rx_bytes;
uint64_t tx_bytes;
uint64_t keepalive_pkts;
uint64_t keepalive_bytes;
uint64_t drop_pkts;
uint64_t drop_bytes;
uint64_t inject_pkts;
uint64_t inject_bytes;
};
enum packet_io_mode
{
PACKET_IO_DUMPFILE = 0,
PACKET_IO_MARSIO = 1,
};
struct packet_io_config
{
enum packet_io_mode mode;
// for dumpfile
char dumpfile_dir[256];
// for marsio
char app_symbol[64];
char dev_symbol[64];
uint8_t nr_threads;
uint16_t cpu_mask[MAX_THREAD_NUM];
};
struct packet_io;
struct packet_io *packet_io_create(struct packet_io_config *config);
void packet_io_destroy(struct packet_io *handle);
struct packet_io_stat *packet_io_get_stat(struct packet_io *handle);
void packet_io_print_stat(struct packet_io *handle);
// return 0 if success, -1 if failed
int packet_io_init(struct packet_io *handle, uint16_t thread_id);
int packet_io_recv(struct packet_io *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_send(struct packet_io *handle, uint16_t thread_id, struct packet *pkt);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -0,0 +1,191 @@
#include <pcap/pcap.h>
#include <pthread.h>
#include <unistd.h>
#include "stellar.h"
#include "file_scan.h"
#include "packet_io.h"
#include "packet_utils.h"
#include "packet_queue.h"
#include "packet_io_dumpfile.h"
#define MAX_PACKET_QUEUE_SIZE (4096 * 1000)
struct packet_io_dumpfile
{
uint8_t nr_threads;
char dumpfile_dir[256];
pcap_t *pcap;
struct packet_queue *queue[MAX_THREAD_NUM];
struct packet_io_stat stat;
uint64_t io_thread_need_exit;
uint64_t io_thread_is_runing;
};
/******************************************************************************
* Private API
******************************************************************************/
static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)user;
struct packet *pkt = packet_new(h->caplen);
if (pkt == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc packet");
return;
}
memcpy((char *)pkt->data_ptr, bytes, h->caplen);
packet_parse(pkt, pkt->data_ptr, h->caplen);
uint64_t hash = packet_get_hash(pkt, LDBC_METHOD_HASH_INT_IP_AND_EXT_IP, 0);
struct packet_queue *queue = handle->queue[hash % handle->nr_threads];
packet_queue_push(queue, pkt);
if (ATOMIC_READ(&handle->io_thread_need_exit))
{
PACKET_IO_LOG_STATE("dumpfile io thread need exit");
pcap_breakloop(handle->pcap);
}
}
static int dumpfile_handle(const char *file, void *arg)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg;
PACKET_IO_LOG_STATE("dumpfile %s inprocessing", file)
handle->pcap = pcap_open_offline(file, NULL);
if (handle->pcap == NULL)
{
PACKET_IO_LOG_ERROR("unable to open pcap file: %s", file);
return -1;
}
pcap_loop(handle->pcap, -1, pcap_handle, (u_char *)handle);
pcap_close(handle->pcap);
PACKET_IO_LOG_STATE("dumpfile %s processed", file)
return 0;
}
static void *dumpfile_thread_cycle(void *arg)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg;
ATOMIC_SET(&handle->io_thread_is_runing, 1);
PACKET_IO_LOG_STATE("dumpfile io thread is running");
file_scan(handle->dumpfile_dir, dumpfile_handle, arg);
PACKET_IO_LOG_STATE("dumpfile io thread is exiting");
ATOMIC_SET(&handle->io_thread_is_runing, 0);
return NULL;
}
/******************************************************************************
* Public API
******************************************************************************/
struct packet_io_dumpfile *packet_io_dumpfile_create(struct packet_io_dumpfile_confg *config)
{
pthread_t tid;
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)calloc(1, sizeof(struct packet_io_dumpfile));
if (handle == NULL)
{
PACKET_IO_LOG_ERROR("unable to allocate memory for packet_io_dumpfile");
return NULL;
}
handle->nr_threads = config->nr_threads;
strncpy(handle->dumpfile_dir, config->dumpfile_dir, sizeof(handle->dumpfile_dir));
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
handle->queue[i] = packet_queue_create(MAX_PACKET_QUEUE_SIZE);
if (handle->queue[i] == NULL)
{
PACKET_IO_LOG_ERROR("unable to create packet queue");
goto error_out;
}
}
if (pthread_create(&tid, NULL, dumpfile_thread_cycle, (void *)handle) != 0)
{
PACKET_IO_LOG_ERROR("unable to create packet io thread");
goto error_out;
}
return handle;
error_out:
packet_io_dumpfile_destory(handle);
return NULL;
}
void packet_io_dumpfile_destory(struct packet_io_dumpfile *handle)
{
if (handle)
{
ATOMIC_SET(&handle->io_thread_need_exit, 1);
while (ATOMIC_READ(&handle->io_thread_is_runing))
{
usleep(1000);
}
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
packet_queue_destory(handle->queue[i]);
}
free(handle);
handle = NULL;
}
}
struct packet_io_stat *packet_io_dumpfile_stat(struct packet_io_dumpfile *handle)
{
return &handle->stat;
}
int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_id)
{
return 0;
}
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet **pkt)
{
struct packet_queue *queue = handle->queue[thread_id];
packet_queue_pop(queue, pkt);
if (*pkt == NULL)
{
return -1;
}
else
{
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, packet_get_len(*pkt));
return 0;
}
}
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkt)
{
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
}
else
{
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
}
packet_free(pkt);
}

View File

@@ -0,0 +1,31 @@
#ifndef _PACKET_IO_DUMPFILE_H
#define _PACKET_IO_DUMPFILE_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include "packet.h"
struct packet_io_dumpfile_confg
{
char dumpfile_dir[256];
uint8_t nr_threads;
};
struct packet_io_dumpfile;
struct packet_io_dumpfile *packet_io_dumpfile_create(struct packet_io_dumpfile_confg *config);
void packet_io_dumpfile_destory(struct packet_io_dumpfile *handle);
struct packet_io_stat *packet_io_dumpfile_stat(struct packet_io_dumpfile *handle);
int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_id);
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkt);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -0,0 +1,242 @@
#include <sched.h>
#include <netinet/ether.h>
#include "stellar.h"
#include "marsio.h"
#include "packet_io.h"
#include "packet_utils.h"
#include "packet_io_marsio.h"
struct packet_io_marsio
{
struct mr_instance *mr_ins;
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
struct packet_io_stat stat;
};
/******************************************************************************
* Private API
******************************************************************************/
static void metadata_to_packet(marsio_buff_t *mbuff, struct packet *pkt)
{
packet_set_io_ctx(pkt, mbuff);
pkt->sid_used = marsio_buff_get_sid_list(mbuff, pkt->sid_list, MAX_SID_NUM);
pkt->route_len = marsio_buff_get_metadata(mbuff, MR_BUFF_ROUTE_CTX, pkt->route_ctx, MAX_ROUTE_LEN);
marsio_buff_get_metadata(mbuff, MR_BUFF_SESSION_ID, &(pkt->session_id), sizeof(pkt->session_id));
marsio_buff_get_metadata(mbuff, MR_BUFF_DIR, &(pkt->direction), sizeof(pkt->direction));
packet_set_type(pkt, marsio_buff_is_ctrlbuf(mbuff) ? PACKET_TYPE_CTRL : PACKET_TYPE_DATA);
packet_set_action(pkt, PACKET_ACTION_FORWARD);
}
static void metadata_to_mbuff(marsio_buff_t *mbuff, struct packet *pkt)
{
marsio_buff_set_sid_list(mbuff, pkt->sid_list, pkt->sid_used);
marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, pkt->route_ctx, pkt->route_len);
marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &(pkt->session_id), sizeof(pkt->session_id));
marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &(pkt->direction), sizeof(pkt->direction));
if (packet_get_type(pkt) == PACKET_TYPE_CTRL)
{
marsio_buff_set_ctrlbuf(mbuff);
}
}
static int is_keepalive_packet(const char *data, int len)
{
if (data == NULL || len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)data;
if (eth_hdr->h_proto == 0xAAAA)
{
return 1;
}
else
{
return 0;
}
}
/******************************************************************************
* Public API
******************************************************************************/
struct packet_io_marsio *packet_io_marsio_create(struct packet_io_marsio_confg *config)
{
int opt = 1;
cpu_set_t coremask;
CPU_ZERO(&coremask);
for (uint8_t i = 0; i < config->nr_threads; i++)
{
CPU_SET(config->cpu_mask[i], &coremask);
}
struct packet_io_marsio *handle = (struct packet_io_marsio *)calloc(1, sizeof(struct packet_io_marsio));
if (handle == NULL)
{
PACKET_IO_LOG_ERROR("unable to allocate memory for packet_io_marsio");
return NULL;
}
handle->mr_ins = marsio_create();
if (handle->mr_ins == NULL)
{
PACKET_IO_LOG_ERROR("unable to create marsio instance");
goto error_out;
}
marsio_option_set(handle->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(coremask));
marsio_option_set(handle->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt));
if (marsio_init(handle->mr_ins, config->app_symbol) != 0)
{
PACKET_IO_LOG_ERROR("unable to init marsio instance");
goto error_out;
}
handle->mr_dev = marsio_open_device(handle->mr_ins, config->dev_symbol, config->nr_threads, config->nr_threads);
if (handle->mr_dev == NULL)
{
PACKET_IO_LOG_ERROR("unable to open marsio device");
goto error_out;
}
handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev);
if (handle->mr_path == NULL)
{
PACKET_IO_LOG_ERROR("unable to create marsio sendpath");
goto error_out;
}
return handle;
error_out:
packet_io_marsio_destory(handle);
return NULL;
}
void packet_io_marsio_destory(struct packet_io_marsio *handle)
{
if (handle)
{
if (handle->mr_path)
{
marsio_sendpath_destory(handle->mr_path);
handle->mr_path = NULL;
}
if (handle->mr_dev)
{
marsio_close_device(handle->mr_dev);
handle->mr_dev = NULL;
}
if (handle->mr_ins)
{
marsio_destory(handle->mr_ins);
handle->mr_ins = NULL;
}
free(handle);
handle = NULL;
}
}
struct packet_io_stat *packet_io_marsio_stat(struct packet_io_marsio *handle)
{
return &handle->stat;
}
int packet_io_marsio_init(struct packet_io_marsio *handle, uint16_t thread_id)
{
if (marsio_thread_init(handle->mr_ins) != 0)
{
PACKET_IO_LOG_ERROR("unable to init marsio thread");
return -1;
}
return 0;
}
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet **pkt)
{
marsio_buff_t *rx_buff;
marsio_buff_t *rx_buffs[1];
thread_local struct packet thd_pkt;
retry:
if (marsio_recv_burst(handle->mr_dev, thread_id, rx_buffs, 1) <= 0)
{
*pkt = NULL;
return -1;
}
rx_buff = rx_buffs[0];
char *data = marsio_buff_mtod(rx_buff);
int len = marsio_buff_datalen(rx_buff);
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, len);
if (is_keepalive_packet(data, len))
{
ATOMIC_ADD(&handle->stat.keepalive_pkts, 1);
ATOMIC_ADD(&handle->stat.keepalive_bytes, len);
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, len);
marsio_send_burst(handle->mr_path, thread_id, rx_buffs, 1);
goto retry;
}
metadata_to_packet(rx_buff, &thd_pkt);
packet_parse(&thd_pkt, data, len);
*pkt = &thd_pkt;
return 0;
}
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkt)
{
marsio_buff_t *tx_buff = (marsio_buff_t *)packet_get_io_ctx(pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
if (tx_buff)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
marsio_buff_free(handle->mr_ins, &tx_buff, 1, 0, thread_id);
}
else
{
// do nothing
}
}
else
{
if (tx_buff == NULL)
{
if (marsio_buff_malloc_global(handle->mr_ins, &tx_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
PACKET_IO_LOG_ERROR("unable to alloc tx buffer");
return;
}
ATOMIC_ADD(&handle->stat.inject_pkts, 1);
ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt));
char *dst = marsio_buff_append(tx_buff, packet_get_len(pkt));
memcpy(dst, packet_get_data(pkt), packet_get_len(pkt));
}
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
metadata_to_mbuff(tx_buff, pkt);
marsio_send_burst(handle->mr_path, thread_id, &tx_buff, 1);
}
packet_free(pkt);
}

View File

@@ -0,0 +1,33 @@
#ifndef _PACKET_IO_MARSIO_H
#define _PACKET_IO_MARSIO_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include "packet.h"
struct packet_io_marsio_confg
{
char app_symbol[64];
char dev_symbol[64];
uint8_t nr_threads;
uint16_t cpu_mask[MAX_THREAD_NUM];
};
struct packet_io_marsio;
struct packet_io_marsio *packet_io_marsio_create(struct packet_io_marsio_confg *config);
void packet_io_marsio_destory(struct packet_io_marsio *handle);
struct packet_io_stat *packet_io_marsio_stat(struct packet_io_marsio *handle);
int packet_io_marsio_init(struct packet_io_marsio *handle, uint16_t thread_id);
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkt);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -0,0 +1,112 @@
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include "packet_io.h"
#include "packet_utils.h"
#include "packet_queue.h"
struct packet_queue
{
pthread_mutex_t lock;
struct packet **queue;
uint32_t size;
uint32_t head;
uint32_t tail;
};
struct packet_queue *packet_queue_create(uint32_t size)
{
struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue));
if (queue == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc packet queue");
return NULL;
}
queue->queue = (struct packet **)calloc(size, sizeof(struct packet *));
if (queue->queue == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc packet queue buffer");
free(queue);
return NULL;
}
queue->size = size;
queue->head = 0;
queue->tail = 0;
pthread_mutex_init(&queue->lock, NULL);
return queue;
}
void packet_queue_destory(struct packet_queue *queue)
{
if (queue == NULL)
{
return;
}
struct packet *pkt = NULL;
while (1)
{
packet_queue_pop(queue, &pkt);
if (pkt == NULL)
{
break;
}
packet_free(pkt);
}
if (queue->queue)
{
free(queue->queue);
queue->queue = NULL;
}
pthread_mutex_destroy(&queue->lock);
free(queue);
}
int packet_queue_is_full(struct packet_queue *queue)
{
return (queue->tail + 1) % queue->size == queue->head;
}
int packet_queue_is_empty(struct packet_queue *queue)
{
return queue->head == queue->tail;
}
void packet_queue_push(struct packet_queue *queue, struct packet *pkt)
{
retry:
pthread_mutex_lock(&queue->lock);
if (packet_queue_is_full(queue))
{
PACKET_IO_LOG_ERROR("packet queue is full, retry later");
pthread_mutex_unlock(&queue->lock);
sleep(1);
goto retry;
}
queue->queue[queue->tail] = pkt;
queue->tail = (queue->tail + 1) % queue->size;
pthread_mutex_unlock(&queue->lock);
}
void packet_queue_pop(struct packet_queue *queue, struct packet **pkt)
{
pthread_mutex_lock(&queue->lock);
if (packet_queue_is_empty(queue))
{
pthread_mutex_unlock(&queue->lock);
*pkt = NULL;
return;
}
*pkt = queue->queue[queue->head];
queue->head = (queue->head + 1) % queue->size;
pthread_mutex_unlock(&queue->lock);
}

View File

@@ -0,0 +1,26 @@
#ifndef _PACKET_QUEUE_H
#define _PACKET_QUEUE_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <stdint.h>
struct packet_queue;
struct packet_queue *packet_queue_create(uint32_t size);
void packet_queue_destory(struct packet_queue *queue);
int packet_queue_is_full(struct packet_queue *queue);
int packet_queue_is_empty(struct packet_queue *queue);
void packet_queue_push(struct packet_queue *queue, struct packet *pkt);
void packet_queue_pop(struct packet_queue *queue, struct packet **pkt);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -1,7 +1,7 @@
#include <assert.h>
#include "session_private.h"
#include "packet_helpers.h"
#include "packet_utils.h"
#define EX_KEY_MAX_LEN 64

View File

@@ -10,7 +10,7 @@
#include "session_manager.h"
#include "tcp_utils.h"
#include "udp_utils.h"
#include "packet_helpers.h"
#include "packet_utils.h"
#include "dupkt_filter.h"
#include "eviction_filter.h"
#include "id_generator.h"

View File

@@ -1,4 +1,4 @@
add_executable(stellar stellar.cpp)
target_link_libraries(stellar id_generator session_manager pthread config mrzcpd)
target_link_libraries(stellar id_generator session_manager pthread config packet_io)
install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)

View File

@@ -7,16 +7,16 @@
#include <signal.h>
#include <stdlib.h>
#include <sys/prctl.h>
#include <netinet/ether.h>
#include "logo.h"
#include "stellar.h"
#include "marsio.h"
#include "config.h"
#include "packet.h"
#include "packet_io.h"
#include "timestamp.h"
#include "session_manager.h"
#include "id_generator.h"
#include "ip_reassemble.h"
#include "session_manager.h"
#define STELLAR_LOG_STATE(format, ...) LOG_STATE("stellar", format, ##__VA_ARGS__)
#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__)
@@ -25,13 +25,6 @@
#define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED)
#define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED)
struct packet_io
{
struct mr_instance *mr_ins;
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
};
struct thread_context
{
pthread_t tid;
@@ -39,6 +32,7 @@ struct thread_context
uint64_t need_exit;
uint64_t is_runing;
struct session_manager *sess_mgr;
struct ip_reassemble_manager *ip_mgr;
};
struct stellar_context
@@ -46,12 +40,18 @@ struct stellar_context
uint64_t need_exit;
struct config config;
struct packet_io *pkt_io;
struct packet_io *packet_io;
struct thread_context threads_ctx[MAX_THREAD_NUM];
};
struct stellar_context stellar_context;
struct stellar_context *stellar_ctx_ptr = &stellar_context;
struct stellar_context *stellar_ctx = &stellar_context;
// config
struct device_config *dev_cfg = &stellar_context.config.dev_cfg;
struct packet_io_config *pkt_io_cfg = &stellar_context.config.pkt_io_cfg;
struct ip_reassemble_config *ip_reass_cfg = &stellar_context.config.ip_reass_cfg;
struct session_manager_config *sess_mgr_cfg = &stellar_context.config.sess_mgr_cfg;
static const char *log_config_file = "./conf/log.toml";
static const char *stellar_config_file = "./conf/stellar.toml";
@@ -60,7 +60,7 @@ static const char *stellar_config_file = "./conf/stellar.toml";
* example
******************************************************************************/
static void __packet_plugin_dispatch_example(const struct packet *pkt)
static void __packet_plugin(const struct packet *pkt)
{
if (pkt == NULL)
{
@@ -71,7 +71,7 @@ static void __packet_plugin_dispatch_example(const struct packet *pkt)
printf("<= packet dispatch\n");
}
static void __session_plugin_dispatch_example(struct session *sess)
static void __session_plugin(struct session *sess)
{
if (sess == NULL)
{
@@ -96,19 +96,19 @@ static void signal_handler(int signo)
if (signo == SIGINT)
{
STELLAR_LOG_STATE("SIGINT received, exit !!!");
ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1);
ATOMIC_SET(&stellar_ctx->need_exit, 1);
}
if (signo == SIGQUIT)
{
STELLAR_LOG_STATE("SIGQUIT received, exit !!!");
ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1);
ATOMIC_SET(&stellar_ctx->need_exit, 1);
}
if (signo == SIGTERM)
{
STELLAR_LOG_STATE("SIGTERM received, exit !!!");
ATOMIC_SET(&stellar_ctx_ptr->need_exit, 1);
ATOMIC_SET(&stellar_ctx->need_exit, 1);
}
if (signo == SIGHUP)
@@ -118,26 +118,6 @@ static void signal_handler(int signo)
}
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static inline int is_keepalive_packet(const char *data, int len)
{
if (data == NULL || len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)data;
if (eth_hdr->h_proto == 0xAAAA)
{
return 1;
}
else
{
return 0;
}
}
/******************************************************************************
* thread
******************************************************************************/
@@ -151,27 +131,15 @@ static inline void thread_set_name(const char *thd_symbol, uint16_t thd_idx)
static void *main_loop(void *arg)
{
int n_pkt_recved;
uint16_t len = 0;
const char *data;
struct packet pkt;
struct packet *pkt = NULL;
struct session *sess;
marsio_buff_t *rx_buff;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
struct thread_context *threads_ctx = (struct thread_context *)arg;
struct session_manager *sess_mgr = threads_ctx->sess_mgr;
struct packet_io *pkt_io = stellar_ctx_ptr->pkt_io;
uint16_t thd_idx = threads_ctx->index;
struct packet_io *packet_io = stellar_ctx->packet_io;
struct session_manager *sess_mgr = threads_ctx->sess_mgr;
struct ip_reassemble_manager *ip_mgr = threads_ctx->ip_mgr;
struct mr_vdev *mr_dev = pkt_io->mr_dev;
struct mr_sendpath *mr_path = pkt_io->mr_path;
struct mr_instance *mr_ins = pkt_io->mr_ins;
struct mr_vdev *vdevs[1] = {
mr_dev,
};
int min_timeout_ms = 10;
if (marsio_thread_init(mr_ins) != 0)
if (packet_io_init(packet_io, thd_idx) != 0)
{
STELLAR_LOG_ERROR("unable to init marsio thread");
return NULL;
@@ -183,50 +151,45 @@ static void *main_loop(void *arg)
while (ATOMIC_READ(&threads_ctx->need_exit) == 0)
{
n_pkt_recved = marsio_recv_burst(mr_dev, thd_idx, rx_buffs, RX_BURST_MAX);
if (n_pkt_recved <= 0)
// recv packet
if (packet_io_recv(packet_io, thd_idx, &pkt) != 0)
{
goto poll_wait;
}
for (int i = 0; i < n_pkt_recved; i++)
{
rx_buff = rx_buffs[i];
data = marsio_buff_mtod(rx_buff);
len = marsio_buff_datalen(rx_buff);
// call packet plugin
__packet_plugin(pkt);
if (is_keepalive_packet(data, len))
// ip fragment reassemble
if (pkt->frag_layer)
{
struct packet *temp = ip_reassemble_packet(ip_mgr, pkt);
// forward the original fragment packet
packet_io_send(packet_io, thd_idx, pkt);
if (temp == NULL)
{
marsio_send_burst(mr_path, thd_idx, &rx_buff, 1);
continue;
goto poll_wait;
}
else
{
packet_parse(&pkt, data, len);
__packet_plugin_dispatch_example(&pkt);
sess = session_manager_update_session(sess_mgr, &pkt);
__session_plugin_dispatch_example(sess);
sess = session_manager_get_evicted_session(sess_mgr);
__session_plugin_dispatch_example(sess);
// TODO
if (1) // action == forward
{
marsio_send_burst(mr_path, thd_idx, &rx_buff, 1);
}
else // action == drop
{
marsio_buff_free(mr_ins, &rx_buff, 1, 0, thd_idx);
}
pkt = temp;
}
}
// update session
sess = session_manager_update_session(sess_mgr, pkt);
__session_plugin(sess);
// get evicted session
sess = session_manager_get_evicted_session(sess_mgr);
__session_plugin(sess);
packet_io_send(packet_io, thd_idx, pkt);
poll_wait:
// get expired session
sess = session_manager_get_expired_session(sess_mgr);
__session_plugin_dispatch_example(sess);
marsio_poll_wait(mr_ins, vdevs, 1, thd_idx, min_timeout_ms);
__session_plugin(sess);
}
ATOMIC_SET(&threads_ctx->is_runing, 0);
@@ -235,12 +198,9 @@ static void *main_loop(void *arg)
return NULL;
}
static int thread_context_init(struct stellar_context *ctx)
static int thread_context_init(struct stellar_context *ctx, uint8_t nr_threads)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
struct session_manager_config *sess_mgr_cfg = &ctx->config.sess_mgr_cfg;
for (uint8_t i = 0; i < sys_cfg->nr_threads; i++)
for (uint8_t i = 0; i < nr_threads; i++)
{
struct thread_context *threads_ctx = &ctx->threads_ctx[i];
threads_ctx->index = i;
@@ -253,22 +213,28 @@ static int thread_context_init(struct stellar_context *ctx)
STELLAR_LOG_ERROR("unable to create session manager");
return -1;
}
threads_ctx->ip_mgr = ip_reassemble_manager_create(ip_reass_cfg);
if (threads_ctx->ip_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create ip reassemble manager");
return -1;
}
}
return 0;
}
static void thread_context_free(struct stellar_context *ctx)
static void thread_context_free(struct stellar_context *ctx, uint8_t nr_threads)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
for (uint8_t i = 0; i < sys_cfg->nr_threads; i++)
for (uint8_t i = 0; i < nr_threads; i++)
{
struct thread_context *threads_ctx = &ctx->threads_ctx[i];
if (ATOMIC_READ(&threads_ctx->is_runing) == 0)
{
STELLAR_LOG_STATE("wait worker thread %d free context", i);
session_manager_destroy(threads_ctx->sess_mgr);
ip_reassemble_manager_destory(threads_ctx->ip_mgr);
}
}
}
@@ -302,93 +268,13 @@ static void thread_destroy(struct thread_context threads_ctx[], uint8_t nr_threa
}
}
/******************************************************************************
* packet io
******************************************************************************/
void packet_io_destroy(struct packet_io *pkt_io)
{
if (pkt_io == NULL)
{
return;
}
if (pkt_io->mr_path != NULL)
{
marsio_sendpath_destory(pkt_io->mr_path);
pkt_io->mr_path = NULL;
}
if (pkt_io->mr_dev != NULL)
{
marsio_close_device(pkt_io->mr_dev);
pkt_io->mr_dev = NULL;
}
if (pkt_io->mr_ins != NULL)
{
marsio_destory(pkt_io->mr_ins);
pkt_io->mr_ins = NULL;
}
free(pkt_io);
pkt_io = NULL;
}
struct packet_io *packet_io_create(struct system_config *sys_cfg)
{
struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io));
if (pkt_io == NULL)
{
STELLAR_LOG_ERROR("unable to alloc packet io");
return NULL;
}
int opt = 1;
cpu_set_t coremask;
CPU_ZERO(&coremask);
for (uint8_t i = 0; i < sys_cfg->nr_threads; i++)
{
CPU_SET(sys_cfg->cpu_mask[i], &coremask);
}
pkt_io->mr_ins = marsio_create();
if (pkt_io->mr_ins == NULL)
{
STELLAR_LOG_ERROR("unable to create marsio instance");
goto error_out;
}
marsio_option_set(pkt_io->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(coremask));
marsio_option_set(pkt_io->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt));
if (marsio_init(pkt_io->mr_ins, sys_cfg->app_symbol) != 0)
{
STELLAR_LOG_ERROR("unable to init marsio instance");
goto error_out;
}
pkt_io->mr_dev = marsio_open_device(pkt_io->mr_ins, sys_cfg->dev_symbol, sys_cfg->nr_threads, sys_cfg->nr_threads);
if (pkt_io->mr_dev == NULL)
{
STELLAR_LOG_ERROR("unable to open marsio device");
goto error_out;
}
pkt_io->mr_path = marsio_sendpath_create_by_vdev(pkt_io->mr_dev);
if (pkt_io->mr_path == NULL)
{
STELLAR_LOG_ERROR("unable to create marsio sendpath");
goto error_out;
}
return pkt_io;
error_out:
packet_io_destroy(pkt_io);
return NULL;
}
/******************************************************************************
* main
******************************************************************************/
int main(int argc, char **argv)
{
memset(stellar_ctx_ptr, 0, sizeof(struct stellar_context));
memset(stellar_ctx, 0, sizeof(struct stellar_context));
timestamp_update();
signal(SIGINT, signal_handler);
@@ -404,17 +290,15 @@ int main(int argc, char **argv)
STELLAR_LOG_STATE("Start Stellar (version: %s)\n %s", __stellar_version, logo_str);
if (config_load(&stellar_ctx_ptr->config, stellar_config_file) != 0)
if (config_load(&stellar_ctx->config, stellar_config_file) != 0)
{
STELLAR_LOG_ERROR("unable to load config file");
return -1;
}
config_dump(&stellar_ctx_ptr->config);
struct system_config *sys_cfg = &stellar_ctx_ptr->config.sys_cfg;
uint8_t nr_threads = sys_cfg->nr_threads;
config_dump(&stellar_ctx->config);
if (id_generator_init(sys_cfg->device_base, sys_cfg->device_offset) != 0)
if (id_generator_init(dev_cfg->device_base, dev_cfg->device_offset) != 0)
{
STELLAR_LOG_ERROR("unable to init id generator");
return -1;
@@ -422,42 +306,40 @@ int main(int argc, char **argv)
// TODO load plugin
stellar_ctx_ptr->pkt_io = packet_io_create(sys_cfg);
if (stellar_ctx_ptr->pkt_io == NULL)
uint8_t nr_threads = pkt_io_cfg->nr_threads;
stellar_ctx->packet_io = packet_io_create(pkt_io_cfg);
if (stellar_ctx->packet_io == NULL)
{
STELLAR_LOG_ERROR("unable to create packet io");
goto error_out;
}
if (thread_context_init(stellar_ctx_ptr) != 0)
if (thread_context_init(stellar_ctx, nr_threads) != 0)
{
STELLAR_LOG_ERROR("unable to init thread context");
goto error_out;
}
if (thread_create(stellar_ctx_ptr->threads_ctx, nr_threads) != 0)
if (thread_create(stellar_ctx->threads_ctx, nr_threads) != 0)
{
STELLAR_LOG_ERROR("unable to create worker thread");
goto error_out;
}
while (!ATOMIC_READ(&stellar_ctx_ptr->need_exit))
while (!ATOMIC_READ(&stellar_ctx->need_exit))
{
timestamp_update();
sleep(1);
}
error_out:
packet_io_destroy(stellar_ctx_ptr->pkt_io);
thread_destroy(stellar_ctx_ptr->threads_ctx, nr_threads);
thread_context_free(stellar_ctx_ptr);
thread_destroy(stellar_ctx->threads_ctx, nr_threads);
thread_context_free(stellar_ctx, nr_threads);
packet_io_destroy(stellar_ctx->packet_io);
// TODO free plugin
id_generator_free();
log_free();
return 0;

View File

@@ -6,9 +6,18 @@ extern "C"
{
#endif
#include <stdint.h>
#define MAX_THREAD_NUM 256
#define RX_BURST_MAX 128
#define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED)
#define ATOMIC_ZERO(x) __atomic_fetch_and(x, 0, __ATOMIC_RELAXED)
#define ATOMIC_ADD(x, y) __atomic_fetch_add(x, y, __ATOMIC_RELAXED)
#define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED)
#ifdef STELLAR_GIT_VERSION
static __attribute__((__used__)) const char *__stellar_version = STELLAR_GIT_VERSION;
#else