feature: packet I/O suppport dumpfile list mode

This commit is contained in:
luwenpeng
2024-08-19 18:40:49 +08:00
parent 520eb085b8
commit 29cbe532ef
8 changed files with 72 additions and 98 deletions

View File

@@ -3,11 +3,13 @@ snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127] snowflake_offset = 2 # [0, 127]
[packet_io] [packet_io]
mode = dumpfile # dumpfile, marsio mode = dumpfile # dumpfile, dumpfilelist, marsio
app_symbol = stellar app_symbol = stellar
dev_symbol = nf_0_fw dev_symbol = nf_0_fw
dumpfile_dir = "/tmp/dumpfile/" dumpfile_path = "/tmp/dumpfile/dumpfile.pcap"
#dumpfile_path = "/tmp/dumpfile/dumpfilelist"
nr_threads = 1 # [1, 256] nr_threads = 1 # [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]

View File

@@ -67,6 +67,10 @@ static int parse_packet_io_section(toml_table_t *root, struct packet_io_options
{ {
opts->mode = PACKET_IO_DUMPFILE; opts->mode = PACKET_IO_DUMPFILE;
} }
else if (strcmp(ptr, "dumpfilelist") == 0)
{
opts->mode = PACKET_IO_DUMPFILELIST;
}
else if (strcmp(ptr, "marsio") == 0) else if (strcmp(ptr, "marsio") == 0)
{ {
opts->mode = PACKET_IO_MARSIO; opts->mode = PACKET_IO_MARSIO;
@@ -77,16 +81,16 @@ static int parse_packet_io_section(toml_table_t *root, struct packet_io_options
return -1; return -1;
} }
if (opts->mode == PACKET_IO_DUMPFILE) if (opts->mode == PACKET_IO_DUMPFILE || opts->mode == PACKET_IO_DUMPFILELIST)
{ {
ptr = toml_raw_in(table, "dumpfile_dir"); ptr = toml_raw_in(table, "dumpfile_path");
if (ptr == NULL) if (ptr == NULL)
{ {
CONFIG_LOG_ERROR("config file missing packet_io->dumpfile_dir"); CONFIG_LOG_ERROR("config file missing packet_io->dumpfile_path");
return -1; return -1;
} }
// skip "" // skip ""
strncpy(opts->dumpfile_dir, ptr + 1, strlen(ptr) - 2); strncpy(opts->dumpfile_path, ptr + 1, strlen(ptr) - 2);
} }
else else
{ {
@@ -594,10 +598,10 @@ void stellar_config_print(const struct stellar_config *config)
CONFIG_LOG_DEBUG("snowflake->snowflake_offset : %d", snowflake_opts->snowflake_offset); CONFIG_LOG_DEBUG("snowflake->snowflake_offset : %d", snowflake_opts->snowflake_offset);
// packet io config // packet io config
CONFIG_LOG_DEBUG("packet_io->mode : %s", pkt_io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : "marsio"); CONFIG_LOG_DEBUG("packet_io->mode : %s", pkt_io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : (pkt_io_opts->mode == PACKET_IO_DUMPFILELIST ? "dumpfilelist" : "marsio"));
if (pkt_io_opts->mode == PACKET_IO_DUMPFILE) if (pkt_io_opts->mode == PACKET_IO_DUMPFILE || pkt_io_opts->mode == PACKET_IO_DUMPFILELIST)
{ {
CONFIG_LOG_DEBUG("packet_io->dumpfile_dir : %s", pkt_io_opts->dumpfile_dir); CONFIG_LOG_DEBUG("packet_io->dumpfile_path : %s", pkt_io_opts->dumpfile_path);
} }
else else
{ {

View File

@@ -25,9 +25,9 @@
struct dumpfile_io struct dumpfile_io
{ {
enum packet_io_mode mode;
uint16_t nr_threads; uint16_t nr_threads;
char work_dir[256]; char dumpfile_path[256];
char directory[256];
pcap_t *pcap; pcap_t *pcap;
struct lock_free_queue *queue[MAX_THREAD_NUM]; struct lock_free_queue *queue[MAX_THREAD_NUM];
@@ -47,69 +47,6 @@ struct pcap_pkt
* Private API * Private API
******************************************************************************/ ******************************************************************************/
typedef int file_handle(const char *file, void *arg);
static int scan_directory(const char *dir, file_handle *handler, void *arg)
{
struct stat statbuf;
struct dirent *entry;
DIR *dp = opendir(dir);
if (NULL == dp)
{
PACKET_IO_LOG_ERROR("opendir %s failed, %s", dir, strerror(errno));
return -1;
}
if (chdir(dir) == -1)
{
PACKET_IO_LOG_ERROR("chdir %s failed, %s", dir, strerror(errno));
goto error_out;
}
while ((entry = readdir(dp)))
{
if (lstat(entry->d_name, &statbuf) == -1)
{
PACKET_IO_LOG_ERROR("lstat %s failed, %s", entry->d_name, strerror(errno));
goto error_out;
}
if (S_IFDIR & statbuf.st_mode)
{
if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name))
{
continue;
}
if (scan_directory(entry->d_name, handler, arg) == -1)
{
goto error_out;
}
}
else
{
if (handler(entry->d_name, arg) == -1)
{
goto error_out;
}
}
}
if (chdir("..") == -1)
{
PACKET_IO_LOG_ERROR("chdir .. failed, %s", strerror(errno));
goto error_out;
}
closedir(dp);
return 0;
error_out:
closedir(dp);
return -1;
}
static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{ {
struct dumpfile_io *handle = (struct dumpfile_io *)user; struct dumpfile_io *handle = (struct dumpfile_io *)user;
@@ -152,16 +89,15 @@ static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const
} }
} }
static int dumpfile_handler(const char *file, void *arg) static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file)
{ {
char resolved_path[256]; char resolved_path[256];
char pcap_errbuf[PCAP_ERRBUF_SIZE]; char pcap_errbuf[PCAP_ERRBUF_SIZE];
struct dumpfile_io *handle = (struct dumpfile_io *)arg;
realpath(file, resolved_path); realpath(pcap_file, resolved_path);
PACKET_IO_LOG_STATE("dumpfile %s in-processing", resolved_path) PACKET_IO_LOG_STATE("dumpfile %s in-processing", resolved_path)
handle->pcap = pcap_open_offline(file, pcap_errbuf); handle->pcap = pcap_open_offline(resolved_path, pcap_errbuf);
if (handle->pcap == NULL) if (handle->pcap == NULL)
{ {
PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
@@ -194,7 +130,39 @@ static void *dumpfile_thread(void *arg)
ATOMIC_SET(&handle->io_thread_is_runing, 1); ATOMIC_SET(&handle->io_thread_is_runing, 1);
PACKET_IO_LOG_STATE("dumpfile io thread is running"); PACKET_IO_LOG_STATE("dumpfile io thread is running");
scan_directory(handle->directory, dumpfile_handler, arg); if (handle->mode == PACKET_IO_DUMPFILE)
{
dumpfile_handler(handle, handle->dumpfile_path);
}
else // PACKET_IO_DUMPFILELIST
{
FILE *fp = fopen(handle->dumpfile_path, "r");
if (fp == NULL)
{
PACKET_IO_LOG_ERROR("unable to open dumpfile list: %s", handle->dumpfile_path);
goto erro_out;
}
char line[PATH_MAX];
while (fgets(line, sizeof(line), fp))
{
if (line[0] == '#')
{
continue;
}
char *pos = strchr(line, '\n');
if (pos)
{
*pos = '\0';
}
dumpfile_handler(handle, line);
}
fclose(fp);
}
erro_out:
PACKET_IO_LOG_STATE("dumpfile io thread processed all pcap files"); PACKET_IO_LOG_STATE("dumpfile io thread processed all pcap files");
while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) while (ATOMIC_READ(&handle->io_thread_need_exit) == 0)
@@ -217,7 +185,7 @@ static void *dumpfile_thread(void *arg)
* Public API * Public API
******************************************************************************/ ******************************************************************************/
struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads) struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_threads)
{ {
pthread_t tid; pthread_t tid;
struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io)); struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io));
@@ -226,14 +194,10 @@ struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads)
PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io"); PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io");
return NULL; return NULL;
} }
if (getcwd(handle->work_dir, sizeof(handle->work_dir)) == NULL)
{
PACKET_IO_LOG_ERROR("unable to get current work directory");
goto error_out;
}
handle->mode = mode;
handle->nr_threads = nr_threads; handle->nr_threads = nr_threads;
strncpy(handle->directory, directory, MIN(strlen(directory), sizeof(handle->directory))); strncpy(handle->dumpfile_path, dumpfile_path, MIN(strlen(dumpfile_path), sizeof(handle->dumpfile_path)));
for (uint16_t i = 0; i < handle->nr_threads; i++) for (uint16_t i = 0; i < handle->nr_threads; i++)
{ {
@@ -419,7 +383,7 @@ int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct pack
inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
} }
snprintf(file, sizeof(file), "%s/inject-%s:%u-%s:%u-%lu.pcap", handle->work_dir, src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected);
if (packet_dump_pcap(pkt, file) == -1) if (packet_dump_pcap(pkt, file) == -1)
{ {

View File

@@ -8,7 +8,7 @@ extern "C"
#include "packet_io.h" #include "packet_io.h"
struct dumpfile_io; struct dumpfile_io;
struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads); struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_threads);
void dumpfile_io_free(struct dumpfile_io *handle); void dumpfile_io_free(struct dumpfile_io *handle);
int dumpfile_io_wait_exit(struct dumpfile_io *handle); int dumpfile_io_wait_exit(struct dumpfile_io *handle);

View File

@@ -26,7 +26,7 @@ struct packet_io *packet_io_new(struct packet_io_options *opts)
} }
else else
{ {
packet_io->dumpfile = dumpfile_io_new(opts->dumpfile_dir, opts->nr_threads); packet_io->dumpfile = dumpfile_io_new(opts->dumpfile_path, packet_io->mode, opts->nr_threads);
} }
if (packet_io->marsio == NULL && packet_io->dumpfile == NULL) if (packet_io->marsio == NULL && packet_io->dumpfile == NULL)
{ {

View File

@@ -6,6 +6,7 @@ extern "C"
#endif #endif
#include <stdint.h> #include <stdint.h>
#include <limits.h>
#include "utils.h" #include "utils.h"
@@ -48,7 +49,8 @@ struct __attribute__((aligned(64))) packet_io_stat
enum packet_io_mode enum packet_io_mode
{ {
PACKET_IO_DUMPFILE = 0, PACKET_IO_DUMPFILE = 0,
PACKET_IO_MARSIO = 1, PACKET_IO_DUMPFILELIST = 1,
PACKET_IO_MARSIO = 2,
}; };
struct packet_io_options struct packet_io_options
@@ -56,7 +58,7 @@ struct packet_io_options
enum packet_io_mode mode; enum packet_io_mode mode;
// for dumpfile // for dumpfile
char dumpfile_dir[256]; char dumpfile_path[PATH_MAX];
// for marsio // for marsio
char app_symbol[64]; char app_symbol[64];

View File

@@ -3,12 +3,14 @@ snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127] snowflake_offset = 2 # [0, 127]
[packet_io] [packet_io]
mode = marsio # dumpfile, marsio mode = marsio # dumpfile, dumpfilelist, marsio
app_symbol = stellar app_symbol = stellar
dev_symbol = nf_0_fw dev_symbol = nf_0_fw
dumpfile_dir = "/tmp/dumpfile/" dumpfile_path = "/tmp/dumpfile/dumpfile.pcap"
nr_threads = 1 # [1, 256] #dumpfile_path = "/tmp/dumpfile/dumpfilelist"
nr_threads = 1 # [1, 256]
cpu_mask = [5] cpu_mask = [5]
[ip_reassembly] [ip_reassembly]

View File

@@ -145,8 +145,8 @@ static inline void expect_cmp_inject(const char *expect_pcap_file, const char *i
static inline void packet_inject_test(struct packet_inject_case *test) static inline void packet_inject_test(struct packet_inject_case *test)
{ {
// create directory // create directory
char dumpfile_dir[PATH_MAX] = {0}; char dumpfile_path[PATH_MAX] = {0};
snprintf(dumpfile_dir, sizeof(dumpfile_dir), "%s/input/", test->work_dir); snprintf(dumpfile_path, sizeof(dumpfile_path), "%s/input/%s", test->work_dir, test->input_pcap);
system_cmd("rm -rf %s", test->work_dir); system_cmd("rm -rf %s", test->work_dir);
system_cmd("mkdir -p %s/input/", test->work_dir); system_cmd("mkdir -p %s/input/", test->work_dir);
system_cmd("mkdir -p %s/log/", test->work_dir); system_cmd("mkdir -p %s/log/", test->work_dir);
@@ -173,9 +173,9 @@ static inline void packet_inject_test(struct packet_inject_case *test)
char temp[PATH_MAX * 2] = {0}; char temp[PATH_MAX * 2] = {0};
getcwd(cwd, sizeof(cwd)); getcwd(cwd, sizeof(cwd));
chdir(test->work_dir); chdir(test->work_dir);
snprintf(temp, sizeof(temp), "dumpfile_dir = \"%s\"", dumpfile_dir); snprintf(temp, sizeof(temp), "dumpfile_path = \"%s\"", dumpfile_path);
EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = dumpfile") == 0); EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = dumpfile") == 0);
EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "dumpfile_dir = \"/tmp/dumpfile/\"", temp) == 0); EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "dumpfile_path = \"/tmp/dumpfile/dumpfile.pcap\"", temp) == 0);
const char *stellar_cfg_file = "./conf/stellar.toml"; const char *stellar_cfg_file = "./conf/stellar.toml";
const char *plugin_cfg_file = "./plugin/spec.toml"; const char *plugin_cfg_file = "./plugin/spec.toml";