From d1d5e6e09e92361f16586f2f611351c45b2b932b Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Fri, 30 Aug 2024 10:21:44 +0800 Subject: [PATCH] refactor packet IO, rename dumpfile mode to pcap mode, modify related configuration --- conf/stellar.toml | 12 +- infra/core/stellar_config.c | 163 -------- infra/core/stellar_config.h | 5 - infra/core/stellar_core.c | 31 +- infra/packet_io/CMakeLists.txt | 2 +- infra/packet_io/dumpfile_io.h | 25 -- infra/packet_io/marsio_io.c | 156 +++---- infra/packet_io/marsio_io.h | 20 +- infra/packet_io/packet_io.c | 379 +++++++++++++----- infra/packet_io/packet_io.h | 40 +- infra/packet_io/{dumpfile_io.c => pcap_io.c} | 161 ++++---- infra/packet_io/pcap_io.h | 24 ++ .../test_based_on_stellar/env/stellar.toml | 12 +- test/lpi_plugin/CMakeLists.txt | 4 +- test/packet_inject/conf/stellar.toml | 12 +- test/packet_inject/packet_inject_test.h | 10 +- 16 files changed, 544 insertions(+), 512 deletions(-) delete mode 100644 infra/packet_io/dumpfile_io.h rename infra/packet_io/{dumpfile_io.c => pcap_io.c} (65%) create mode 100644 infra/packet_io/pcap_io.h diff --git a/conf/stellar.toml b/conf/stellar.toml index 360e6fe..880fad6 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31] snowflake_offset = 2 # [0, 127] [packet_io] -mode = "dumpfile" # dumpfile, dumpfilelist, marsio +mode = "pcapfile" # pcapfile, pcaplist, marsio app_symbol = "stellar" dev_symbol = "nf_0_fw" - -dumpfile_path = "/tmp/dumpfile/dumpfile.pcap" -#dumpfile_path = "/tmp/dumpfile/dumpfilelist" - -nr_worker_thread = 1 # [1, 256] +pcap_path = "/tmp/test.pcap" +nr_worker_thread = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] +idle_yield_interval_ms = 900 # range: [0, 60000] (ms) [ip_reassembly] enable = 1 @@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096] [schedule] merge_stat_interval = 500 # range: [1, 60000] (ms) output_stat_interval = 2000 # range: [1, 60000] (ms) - -packet_io_yield_interval = 900 # range: [1, 60000] (ms) diff --git a/infra/core/stellar_config.c b/infra/core/stellar_config.c index d1511fe..b847d11 100644 --- a/infra/core/stellar_config.c +++ b/infra/core/stellar_config.c @@ -42,132 +42,6 @@ static int parse_snowflake_section(toml_table_t *root, struct snowflake_options return 0; } -// return 0: success -// retuun -1: failed -static int parse_packet_io_section(toml_table_t *root, struct packet_io_options *opts) -{ - int ret = -1; - char *ptr_mode = NULL; - char *ptr_dumpfile_path = NULL; - char *ptr_app_symbol = NULL; - char *ptr_dev_symbol = NULL; - const char *ptr; - toml_table_t *table; - toml_array_t *mask_array; - - table = toml_table_in(root, "packet_io"); - if (table == NULL) - { - CONFIG_LOG_ERROR("config file missing packet_io section"); - goto error_out; - } - - ptr = toml_raw_in(table, "mode"); - if (ptr == NULL || toml_rtos(ptr, &ptr_mode) != 0) - { - CONFIG_LOG_ERROR("config file missing packet_io->mode"); - goto error_out; - } - if (strcmp(ptr_mode, "dumpfile") == 0) - { - opts->mode = PACKET_IO_DUMPFILE; - } - else if (strcmp(ptr_mode, "dumpfilelist") == 0) - { - opts->mode = PACKET_IO_DUMPFILELIST; - } - else if (strcmp(ptr_mode, "marsio") == 0) - { - opts->mode = PACKET_IO_MARSIO; - } - else - { - CONFIG_LOG_ERROR("config file invalid packet_io->mode %s, only support dumpfile and marsio", ptr); - goto error_out; - } - - if (opts->mode == PACKET_IO_DUMPFILE || opts->mode == PACKET_IO_DUMPFILELIST) - { - ptr = toml_raw_in(table, "dumpfile_path"); - if (ptr == NULL || toml_rtos(ptr, &ptr_dumpfile_path) != 0) - { - CONFIG_LOG_ERROR("config file missing packet_io->dumpfile_path"); - goto error_out; - } - strcpy(opts->dumpfile_path, ptr_dumpfile_path); - } - else - { - ptr = toml_raw_in(table, "app_symbol"); - if (ptr == NULL || toml_rtos(ptr, &ptr_app_symbol) != 0) - { - CONFIG_LOG_ERROR("config file missing packet_io->app_symbol"); - goto error_out; - } - strcpy(opts->app_symbol, ptr_app_symbol); - - ptr = toml_raw_in(table, "dev_symbol"); - if (ptr == NULL || toml_rtos(ptr, &ptr_dev_symbol) != 0) - { - CONFIG_LOG_ERROR("config file missing packet_io->dev_symbol"); - goto error_out; - } - strcpy(opts->dev_symbol, ptr_dev_symbol); - } - - ptr = toml_raw_in(table, "nr_worker_thread"); - if (ptr == NULL) - { - CONFIG_LOG_ERROR("config file missing packet_io->nr_worker_thread"); - goto error_out; - } - if (atoi(ptr) <= 0 || atoi(ptr) > MAX_THREAD_NUM) - { - CONFIG_LOG_ERROR("config file invalid packet_io->nr_worker_thread %d, range [1, %d]", atoi(ptr), MAX_THREAD_NUM); - goto error_out; - } - opts->nr_worker_thread = atoi(ptr); - - mask_array = toml_array_in(table, "cpu_mask"); - if (mask_array == NULL) - { - CONFIG_LOG_ERROR("config file missing packet_io->cpu_mask"); - goto error_out; - } - for (uint16_t i = 0; i < opts->nr_worker_thread; i++) - { - ptr = toml_raw_at(mask_array, i); - if (ptr == NULL) - { - CONFIG_LOG_ERROR("config file missing packet_io->cpu_mask[%d]", i); - goto error_out; - } - opts->cpu_mask[i] = atoi(ptr); - } - - ret = 0; - -error_out: - if (ptr_mode) - { - free(ptr_mode); - } - if (ptr_dumpfile_path) - { - free(ptr_dumpfile_path); - } - if (ptr_app_symbol) - { - free(ptr_app_symbol); - } - if (ptr_dev_symbol) - { - free(ptr_dev_symbol); - } - - return ret; -} - // return 0: success // retuun -1: failed static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts) @@ -208,19 +82,6 @@ static int parse_schedule_options(toml_table_t *root, struct schedule_options *o return -1; } - ptr = toml_raw_in(table, "packet_io_yield_interval"); - if (ptr == NULL) - { - CONFIG_LOG_ERROR("config file missing schedule->packet_io_yield_interval"); - return -1; - } - opts->packet_io_yield_interval = atoll(ptr); - if (opts->packet_io_yield_interval < 1 || opts->packet_io_yield_interval > 60000) - { - CONFIG_LOG_ERROR("config file invalid schedule->packet_io_yield_interval %ld, range [1, 60000]", opts->packet_io_yield_interval); - return -1; - } - return 0; } @@ -252,11 +113,6 @@ int stellar_config_load(struct stellar_config *config, const char *file) goto error_out; } - if (parse_packet_io_section(table, &config->pkt_io_opts) != 0) - { - goto error_out; - } - if (parse_schedule_options(table, &config->sched_opts) != 0) { goto error_out; @@ -285,32 +141,13 @@ void stellar_config_print(const struct stellar_config *config) return; } - const struct packet_io_options *pkt_io_opts = &config->pkt_io_opts; const struct snowflake_options *snowflake_opts = &config->snowflake_opts; // snowflake config CONFIG_LOG_DEBUG("snowflake->snowflake_base : %d", snowflake_opts->snowflake_base); CONFIG_LOG_DEBUG("snowflake->snowflake_offset : %d", snowflake_opts->snowflake_offset); - // packet io config - CONFIG_LOG_DEBUG("packet_io->mode : %s", pkt_io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : (pkt_io_opts->mode == PACKET_IO_DUMPFILELIST ? "dumpfilelist" : "marsio")); - if (pkt_io_opts->mode == PACKET_IO_DUMPFILE || pkt_io_opts->mode == PACKET_IO_DUMPFILELIST) - { - CONFIG_LOG_DEBUG("packet_io->dumpfile_path : %s", pkt_io_opts->dumpfile_path); - } - else - { - CONFIG_LOG_DEBUG("packet_io->app_symbol : %s", pkt_io_opts->app_symbol); - CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", pkt_io_opts->dev_symbol); - } - CONFIG_LOG_DEBUG("packet_io->nr_worker_thread : %d", pkt_io_opts->nr_worker_thread); - for (uint16_t i = 0; i < pkt_io_opts->nr_worker_thread; i++) - { - CONFIG_LOG_DEBUG("packet_io->cpu_mask[%3d] : %d", i, pkt_io_opts->cpu_mask[i]); - } - // schedule config CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval); CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval); - CONFIG_LOG_DEBUG("schedule->packet_io_yield_interval : %ld", config->sched_opts.packet_io_yield_interval); } diff --git a/infra/core/stellar_config.h b/infra/core/stellar_config.h index 1e5f01e..6cd0a08 100644 --- a/infra/core/stellar_config.h +++ b/infra/core/stellar_config.h @@ -5,14 +5,10 @@ extern "C" { #endif -#include "packet_io.h" - struct schedule_options { uint64_t merge_stat_interval; // range: [1, 60000] (ms) uint64_t output_stat_interval; // range: [1, 60000] (ms) - - uint64_t packet_io_yield_interval; // range: [1, 60000] (ms) }; struct snowflake_options @@ -23,7 +19,6 @@ struct snowflake_options struct stellar_config { - struct packet_io_options pkt_io_opts; struct snowflake_options snowflake_opts; struct schedule_options sched_opts; }; diff --git a/infra/core/stellar_core.c b/infra/core/stellar_core.c index 83e9531..8545c7e 100644 --- a/infra/core/stellar_core.c +++ b/infra/core/stellar_core.c @@ -60,6 +60,7 @@ struct stellar_runtime struct stellar_thread threads[MAX_THREAD_NUM]; struct session_manager_config *sess_mgr_cfg; struct ip_reassembly_config *ip_reass_cfg; + struct packet_io_config *pkt_io_cfg; }; struct stellar @@ -145,7 +146,6 @@ static void *worker_thread(void *arg) }; uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval; - uint64_t packet_io_yield_interval = config->sched_opts.packet_io_yield_interval; uint16_t thr_idx = thread->idx; __current_thread_idx = thr_idx; @@ -300,7 +300,7 @@ static void *worker_thread(void *arg) if (nr_pkt_received == 0) { - packet_io_yield(packet_io, thr_idx, packet_io_yield_interval); + packet_io_yield(packet_io, thr_idx); } } @@ -345,7 +345,7 @@ static int stellar_thread_init(struct stellar *st) struct stellar_config *config = &st->config; uint64_t now_ms = clock_get_real_time_ms(); - for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++) + for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; thread->idx = i; @@ -383,10 +383,9 @@ static int stellar_thread_init(struct stellar *st) static void stellar_thread_clean(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; CORE_LOG_FATAL("cleaning worker thread context ..."); - for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++) + for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (ATOMIC_READ(&thread->is_runing) == 0) @@ -402,9 +401,8 @@ static void stellar_thread_clean(struct stellar *st) static int stellar_thread_run(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; - for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++) + for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0) @@ -420,10 +418,9 @@ static int stellar_thread_run(struct stellar *st) static void stellar_thread_join(struct stellar *st) { struct stellar_runtime *runtime = &st->runtime; - struct stellar_config *config = &st->config; CORE_LOG_FATAL("waiting worker thread stop ..."); - for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++) + for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++) { struct stellar_thread *thread = &runtime->threads[i]; while (ATOMIC_READ(&thread->is_runing) == 1) @@ -489,7 +486,14 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg CORE_LOG_ERROR("unable to create ip reassembly config"); goto error_out; } + runtime->pkt_io_cfg = packet_io_config_new(st->stellar_cfg_file); + if (runtime->pkt_io_cfg == NULL) + { + CORE_LOG_ERROR("unable to create packet io config"); + goto error_out; + } + packet_io_config_print(runtime->pkt_io_cfg); session_manager_config_print(runtime->sess_mgr_cfg); ip_reassembly_config_print(runtime->ip_reass_cfg); if (stellar_config_load(config, st->stellar_cfg_file) != 0) @@ -499,7 +503,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg } stellar_config_print(config); - runtime->stat = stellar_stat_new(config->pkt_io_opts.nr_worker_thread); + runtime->stat = stellar_stat_new(runtime->pkt_io_cfg->nr_worker_thread); if (runtime->stat == NULL) { CORE_LOG_ERROR("unable to create stellar stat"); @@ -512,7 +516,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg goto error_out; } - runtime->packet_io = packet_io_new(&config->pkt_io_opts); + runtime->packet_io = packet_io_new(runtime->pkt_io_cfg); if (runtime->packet_io == NULL) { CORE_LOG_ERROR("unable to create packet io"); @@ -563,7 +567,7 @@ void stellar_run(struct stellar *st) } usleep(1000); // 1ms - // only available in dumpfile mode + // only available in pcap mode if (packet_io_isbreak(runtime->packet_io)) { ATOMIC_SET(&runtime->need_exit, 1); @@ -587,6 +591,7 @@ void stellar_free(struct stellar *st) packet_io_free(runtime->packet_io); plugin_manager_exit(runtime->plug_mgr); stellar_stat_free(runtime->stat); + packet_io_config_free(runtime->pkt_io_cfg); ip_reassembly_config_free(runtime->ip_reass_cfg); session_manager_config_free(runtime->sess_mgr_cfg); CORE_LOG_FATAL("stellar exit\n"); @@ -675,7 +680,7 @@ void stellar_send_build_packet(struct stellar *st, struct packet *pkt) int stellar_get_worker_thread_num(struct stellar *st) { - return st->config.pkt_io_opts.nr_worker_thread; + return st->runtime.pkt_io_cfg->nr_worker_thread; } struct logger *stellar_get_logger(struct stellar *st) diff --git a/infra/packet_io/CMakeLists.txt b/infra/packet_io/CMakeLists.txt index 3d77732..4d0baa9 100644 --- a/infra/packet_io/CMakeLists.txt +++ b/infra/packet_io/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(packet_io dumpfile_io.c marsio_io.c packet_io.c) +add_library(packet_io pcap_io.c marsio_io.c packet_io.c) target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_link_libraries(packet_io marsio pcap packet_parser) \ No newline at end of file diff --git a/infra/packet_io/dumpfile_io.h b/infra/packet_io/dumpfile_io.h deleted file mode 100644 index 05f4ccc..0000000 --- a/infra/packet_io/dumpfile_io.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "packet_io.h" - -struct dumpfile_io; -struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_worker_thread); -void dumpfile_io_free(struct dumpfile_io *handle); -int dumpfile_io_isbreak(struct dumpfile_io *handle); - -int dumpfile_io_init(struct dumpfile_io *handle, uint16_t thr_idx); -uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void dumpfile_io_yield(struct dumpfile_io *handle, uint16_t thr_idx, uint64_t timeout_ms); -struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx); - -#ifdef __cplusplus -} -#endif diff --git a/infra/packet_io/marsio_io.c b/infra/packet_io/marsio_io.c index 96d39fd..c7486d2 100644 --- a/infra/packet_io/marsio_io.c +++ b/infra/packet_io/marsio_io.c @@ -10,10 +10,11 @@ #include "packet_parser.h" #include "packet_private.h" -#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__) +#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__) struct marsio_io { + struct packet_io_config cfg; struct mr_instance *mr_ins; struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; @@ -41,7 +42,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get route ctx"); + MARSIO_IO_LOG_ERROR("failed to get route ctx"); } sids.used = marsio_buff_get_sid_list(mbuff, sids.sid, sizeof(sids.sid) / sizeof(sids.sid[0])); @@ -51,7 +52,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get sids"); + MARSIO_IO_LOG_ERROR("failed to get sids"); } if (marsio_buff_get_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) == sizeof(session_id)) @@ -60,7 +61,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get session id"); + MARSIO_IO_LOG_ERROR("failed to get session id"); } // TODO @@ -71,7 +72,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get domain id"); + MARSIO_IO_LOG_ERROR("failed to get domain id"); } #endif @@ -81,7 +82,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get link id"); + MARSIO_IO_LOG_ERROR("failed to get link id"); } is_ctrl = marsio_buff_is_ctrlbuf(mbuff); @@ -93,7 +94,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p } else { - PACKET_IO_LOG_ERROR("failed to get direction"); + MARSIO_IO_LOG_ERROR("failed to get direction"); } packet_set_action(pkt, PACKET_ACTION_FORWARD); @@ -116,30 +117,30 @@ static void metadata_from_packet_to_mbuff(struct packet *pkt, marsio_buff_t *mbu if (marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, (void *)route_ctx->data, route_ctx->used) != 0) { - PACKET_IO_LOG_ERROR("failed to set route ctx"); + MARSIO_IO_LOG_ERROR("failed to set route ctx"); } if (marsio_buff_set_sid_list(mbuff, (sid_t *)sids->sid, sids->used) != 0) { - PACKET_IO_LOG_ERROR("failed to set sids"); + MARSIO_IO_LOG_ERROR("failed to set sids"); } if (marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) != 0) { - PACKET_IO_LOG_ERROR("failed to set session id"); + MARSIO_IO_LOG_ERROR("failed to set session id"); } // TODO #if 0 if (marsio_buff_set_metadata(mbuff, MR_BUFF_DOMAIN, &domain, sizeof(domain)) != 0) { - PACKET_IO_LOG_ERROR("failed to set domain id"); + MARSIO_IO_LOG_ERROR("failed to set domain id"); } #endif if (marsio_buff_set_metadata(mbuff, MR_BUFF_LINK_ID, &link_id, sizeof(link_id)) != 0) { - PACKET_IO_LOG_ERROR("failed to set link id"); + MARSIO_IO_LOG_ERROR("failed to set link id"); } if (is_ctrl) @@ -149,7 +150,7 @@ static void metadata_from_packet_to_mbuff(struct packet *pkt, marsio_buff_t *mbu if (marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &direction, sizeof(direction)) != 0) { - PACKET_IO_LOG_ERROR("failed to set direction"); + MARSIO_IO_LOG_ERROR("failed to set direction"); } } @@ -175,46 +176,48 @@ static inline int is_keepalive_packet(const char *data, int len) * Public API ******************************************************************************/ -struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_worker_thread) +void *marsio_io_new(const struct packet_io_config *cfg) { int opt = 1; cpu_set_t coremask; CPU_ZERO(&coremask); - for (uint16_t i = 0; i < nr_worker_thread; i++) - { - CPU_SET(cpu_mask[i], &coremask); - } struct marsio_io *handle = (struct marsio_io *)calloc(1, sizeof(struct marsio_io)); if (handle == NULL) { - PACKET_IO_LOG_ERROR("unable to allocate memory for marsio_io"); + MARSIO_IO_LOG_ERROR("unable to allocate memory for marsio_io"); return NULL; } + memcpy(&handle->cfg, cfg, sizeof(struct packet_io_config)); + + for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++) + { + CPU_SET(handle->cfg.cpu_mask[i], &coremask); + } handle->mr_ins = marsio_create(); if (handle->mr_ins == NULL) { - PACKET_IO_LOG_ERROR("unable to create marsio instance"); + MARSIO_IO_LOG_ERROR("unable to create marsio instance"); goto error_out; } marsio_option_set(handle->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(cpu_set_t)); marsio_option_set(handle->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)); - if (marsio_init(handle->mr_ins, app_symbol) != 0) + if (marsio_init(handle->mr_ins, handle->cfg.app_symbol) != 0) { - PACKET_IO_LOG_ERROR("unable to init marsio instance"); + MARSIO_IO_LOG_ERROR("unable to init marsio instance"); goto error_out; } - handle->mr_dev = marsio_open_device(handle->mr_ins, dev_symbol, nr_worker_thread, nr_worker_thread); + handle->mr_dev = marsio_open_device(handle->mr_ins, handle->cfg.dev_symbol, handle->cfg.nr_worker_thread, handle->cfg.nr_worker_thread); if (handle->mr_dev == NULL) { - PACKET_IO_LOG_ERROR("unable to open marsio device"); + MARSIO_IO_LOG_ERROR("unable to open marsio device"); goto error_out; } handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev); if (handle->mr_path == NULL) { - PACKET_IO_LOG_ERROR("unable to create marsio sendpath"); + MARSIO_IO_LOG_ERROR("unable to create marsio sendpath"); goto error_out; } @@ -225,58 +228,66 @@ error_out: return NULL; } -void marsio_io_free(struct marsio_io *handle) +void marsio_io_free(void *handle) { - if (handle) + struct marsio_io *mr_io = (struct marsio_io *)handle; + if (mr_io) { - if (handle->mr_path) + if (mr_io->mr_path) { - marsio_sendpath_destory(handle->mr_path); - handle->mr_path = NULL; + marsio_sendpath_destory(mr_io->mr_path); + mr_io->mr_path = NULL; } - if (handle->mr_dev) + if (mr_io->mr_dev) { - marsio_close_device(handle->mr_dev); - handle->mr_dev = NULL; + marsio_close_device(mr_io->mr_dev); + mr_io->mr_dev = NULL; } - if (handle->mr_ins) + if (mr_io->mr_ins) { - marsio_destory(handle->mr_ins); - handle->mr_ins = NULL; + marsio_destory(mr_io->mr_ins); + mr_io->mr_ins = NULL; } - free(handle); - handle = NULL; + free(mr_io); + mr_io = NULL; } } -int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx __attribute__((unused))) +int marsio_io_isbreak(void *handle __attribute__((unused))) { - if (marsio_thread_init(handle->mr_ins) != 0) + return 0; +} + +int marsio_io_init(void *handle, uint16_t thr_idx __attribute__((unused))) +{ + struct marsio_io *mr_io = (struct marsio_io *)handle; + if (marsio_thread_init(mr_io->mr_ins) != 0) { - PACKET_IO_LOG_ERROR("unable to init marsio thread"); + MARSIO_IO_LOG_ERROR("unable to init marsio thread"); return -1; } return 0; } -uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +uint16_t marsio_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { + int len; + char *data; + uint16_t nr_packet_parsed = 0; struct packet *pkt; marsio_buff_t *mbuff; marsio_buff_t *rx_buffs[RX_BURST_MAX]; - struct packet_io_stat *stat = &handle->stat[thr_idx]; - uint16_t nr_parsed = 0; - int len; - char *data; + struct marsio_io *mr_io = (struct marsio_io *)handle; + struct packet_io_stat *stat = &mr_io->stat[thr_idx]; - int nr_recv = marsio_recv_burst(handle->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts)); - if (nr_recv <= 0) + int nr_packet_received = marsio_recv_burst(mr_io->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts)); + if (nr_packet_received <= 0) { - return nr_parsed; + return nr_packet_parsed; } - for (int i = 0; i < nr_recv; i++) + for (int i = 0; i < nr_packet_received; i++) { mbuff = rx_buffs[i]; data = marsio_buff_mtod(mbuff); @@ -293,11 +304,11 @@ uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct pack stat->pkts_tx++; stat->bytes_tx += len; - marsio_send_burst(handle->mr_path, thr_idx, &mbuff, 1); + marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1); continue; } - pkt = &pkts[nr_parsed++]; + pkt = &pkts[nr_packet_parsed++]; packet_parse(pkt, data, len); metadata_from_mbuff_to_packet(mbuff, pkt); @@ -313,15 +324,16 @@ uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct pack } } - return nr_parsed; + return nr_packet_parsed; } -void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void marsio_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { + int len; struct packet *pkt; marsio_buff_t *mbuff; - struct packet_io_stat *stat = &handle->stat[thr_idx]; - int len; + struct marsio_io *mr_io = (struct marsio_io *)handle; + struct packet_io_stat *stat = &mr_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { @@ -346,16 +358,17 @@ void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet stat->raw_bytes_tx += len; } - marsio_send_burst(handle->mr_path, thr_idx, &mbuff, 1); + marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1); packet_free(pkt); } } -void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt; marsio_buff_t *mbuff; - struct packet_io_stat *stat = &handle->stat[thr_idx]; + struct marsio_io *mr_io = (struct marsio_io *)handle; + struct packet_io_stat *stat = &mr_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { @@ -365,29 +378,30 @@ void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *p { stat->pkts_dropped++; stat->bytes_dropped += packet_get_raw_len(pkt); - marsio_buff_free(handle->mr_ins, &mbuff, 1, 0, thr_idx); + marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx); } packet_free(pkt); } } -uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; char *ptr; uint16_t nr_inject = 0; struct packet *pkt; marsio_buff_t *mbuff; - struct packet_io_stat *stat = &handle->stat[thr_idx]; + struct marsio_io *mr_io = (struct marsio_io *)handle; + struct packet_io_stat *stat = &mr_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); - if (marsio_buff_malloc_global(handle->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) + if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) { - PACKET_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); + MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); continue; } @@ -406,23 +420,25 @@ uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct pac memcpy(ptr, packet_get_raw_data(pkt), len); metadata_from_packet_to_mbuff(pkt, mbuff); - marsio_send_burst_with_options(handle->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); + marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); packet_free(pkt); } return nr_inject; } -void marsio_io_yield(struct marsio_io *handle, uint16_t thr_idx, uint64_t timeout_ms) +void marsio_io_yield(void *handle, uint16_t thr_idx) { + struct marsio_io *mr_io = (struct marsio_io *)handle; struct mr_vdev *vdevs[1] = { - handle->mr_dev, + mr_io->mr_dev, }; - marsio_poll_wait(handle->mr_ins, vdevs, 1, thr_idx, timeout_ms); + marsio_poll_wait(mr_io->mr_ins, vdevs, 1, thr_idx, mr_io->cfg.idle_yield_interval_ms); } -struct packet_io_stat *marsio_io_stat(struct marsio_io *handle, uint16_t thr_idx) +struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx) { - return &handle->stat[thr_idx]; + struct marsio_io *mr_io = (struct marsio_io *)handle; + return &mr_io->stat[thr_idx]; } diff --git a/infra/packet_io/marsio_io.h b/infra/packet_io/marsio_io.h index daccc4e..876618a 100644 --- a/infra/packet_io/marsio_io.h +++ b/infra/packet_io/marsio_io.h @@ -7,17 +7,17 @@ extern "C" #include "packet_io.h" -struct marsio_io; -struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_worker_thread); -void marsio_io_free(struct marsio_io *handle); +void *marsio_io_new(const struct packet_io_config *cfg); +void marsio_io_free(void *handle); +int marsio_io_isbreak(void *handle); -int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx); -uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void marsio_io_yield(struct marsio_io *handle, uint16_t thr_idx, uint64_t timeout_ms); -struct packet_io_stat *marsio_io_stat(struct marsio_io *handle, uint16_t thr_idx); +int marsio_io_init(void *handle, uint16_t thr_idx); +uint16_t marsio_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void marsio_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void marsio_io_yield(void *handle, uint16_t thr_idx); +struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx); #ifdef __cplusplus } diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index 3bd00e4..42a9743 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -1,154 +1,341 @@ +#include #include #include +#include "toml.h" +#include "pcap_io.h" #include "marsio_io.h" -#include "dumpfile_io.h" +#include "log_private.h" + +#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet_io", format, ##__VA_ARGS__) +#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet_io", format, ##__VA_ARGS__) struct packet_io { - enum packet_io_mode mode; - struct marsio_io *marsio; - struct dumpfile_io *dumpfile; + void *handle; + + void *(*new_func)(const struct packet_io_config *cfg); + void (*free_func)(void *handle); + int (*isbreak_func)(void *handle); + + int (*init_func)(void *handle, uint16_t thr_idx); + uint16_t (*input_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); + void (*output_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); + void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); + uint16_t (*inject_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); + void (*yield_func)(void *handle, uint16_t thr_idx); + struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); }; -struct packet_io *packet_io_new(struct packet_io_options *opts) +int packet_io_config_load(struct packet_io_config *cfg, const char *toml_file) { - struct packet_io *packet_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); - if (packet_io == NULL) + int ret = -1; + const char *ptr; + char *ptr_mode = NULL; + char *ptr_dumpfile_path = NULL; + char *ptr_app_symbol = NULL; + char *ptr_dev_symbol = NULL; + char errbuf[200]; + FILE *fp = NULL; + toml_table_t *root = NULL; + toml_table_t *table = NULL; + toml_array_t *mask; + + fp = fopen(toml_file, "r"); + if (fp == NULL) + { + PACKET_IO_LOG_ERROR("config file %s open failed, %s", toml_file, strerror(errno)); + goto error_out; + } + + root = toml_parse_file(fp, errbuf, sizeof(errbuf)); + if (root == NULL) + { + PACKET_IO_LOG_ERROR("config file %s parse failed, %s", toml_file, errbuf); + goto error_out; + } + + table = toml_table_in(root, "packet_io"); + if (table == NULL) + { + PACKET_IO_LOG_ERROR("config file %s missing packet_io", toml_file); + goto error_out; + } + + ptr = toml_raw_in(table, "mode"); + if (ptr == NULL || toml_rtos(ptr, &ptr_mode) != 0) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.mode"); + goto error_out; + } + if (strcmp(ptr_mode, "pcapfile") == 0) + { + cfg->mode = PACKET_IO_PCAPFILE; + } + else if (strcmp(ptr_mode, "pcaplist") == 0) + { + cfg->mode = PACKET_IO_PCAPLIST; + } + else if (strcmp(ptr_mode, "marsio") == 0) + { + cfg->mode = PACKET_IO_MARSIO; + } + else + { + PACKET_IO_LOG_ERROR("config file invalid packet_io.mode %s", ptr); + goto error_out; + } + + if (cfg->mode == PACKET_IO_PCAPFILE || cfg->mode == PACKET_IO_PCAPLIST) + { + ptr = toml_raw_in(table, "pcap_path"); + if (ptr == NULL || toml_rtos(ptr, &ptr_dumpfile_path) != 0) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.pcap_path"); + goto error_out; + } + strcpy(cfg->pcap_path, ptr_dumpfile_path); + } + else + { + ptr = toml_raw_in(table, "app_symbol"); + if (ptr == NULL || toml_rtos(ptr, &ptr_app_symbol) != 0) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.app_symbol"); + goto error_out; + } + strcpy(cfg->app_symbol, ptr_app_symbol); + + ptr = toml_raw_in(table, "dev_symbol"); + if (ptr == NULL || toml_rtos(ptr, &ptr_dev_symbol) != 0) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.dev_symbol"); + goto error_out; + } + strcpy(cfg->dev_symbol, ptr_dev_symbol); + } + + ptr = toml_raw_in(table, "nr_worker_thread"); + if (ptr == NULL) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.nr_worker_thread"); + goto error_out; + } + cfg->nr_worker_thread = atoi(ptr); + if (cfg->nr_worker_thread == 0 || cfg->nr_worker_thread > MAX_THREAD_NUM) + { + PACKET_IO_LOG_ERROR("config file invalid packet_io.nr_worker_thread %d, range [1, %d]", cfg->nr_worker_thread, MAX_THREAD_NUM); + goto error_out; + } + + mask = toml_array_in(table, "cpu_mask"); + if (mask == NULL) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.cpu_mask"); + goto error_out; + } + for (uint16_t i = 0; i < cfg->nr_worker_thread; i++) + { + ptr = toml_raw_at(mask, i); + if (ptr == NULL) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.cpu_mask[%d]", i); + goto error_out; + } + cfg->cpu_mask[i] = atoi(ptr); + } + + ptr = toml_raw_in(table, "idle_yield_interval_ms"); + if (ptr == NULL) + { + PACKET_IO_LOG_ERROR("config file missing packet_io.idle_yield_interval_ms"); + goto error_out; + } + cfg->idle_yield_interval_ms = atoll(ptr); + if (cfg->idle_yield_interval_ms > 60000) + { + PACKET_IO_LOG_ERROR("config file invalid packet_io.idle_yield_interval_ms %d, range [0, %d]", cfg->idle_yield_interval_ms, 60000); + goto error_out; + } + + ret = 0; +error_out: + if (ptr_mode) + { + free(ptr_mode); + } + if (ptr_dumpfile_path) + { + free(ptr_dumpfile_path); + } + if (ptr_app_symbol) + { + free(ptr_app_symbol); + } + if (ptr_dev_symbol) + { + free(ptr_dev_symbol); + } + if (root) + { + toml_free(root); + } + if (fp) + { + fclose(fp); + } + + return ret; +} + +struct packet_io_config *packet_io_config_new(const char *toml_file) +{ + if (toml_file == NULL) { return NULL; } - packet_io->mode = opts->mode; - if (opts->mode == PACKET_IO_MARSIO) + struct packet_io_config *cfg = (struct packet_io_config *)calloc(1, sizeof(struct packet_io_config)); + if (cfg == NULL) { - packet_io->marsio = marsio_io_new(opts->app_symbol, opts->dev_symbol, opts->cpu_mask, opts->nr_worker_thread); - } - else - { - packet_io->dumpfile = dumpfile_io_new(opts->dumpfile_path, packet_io->mode, opts->nr_worker_thread); - } - if (packet_io->marsio == NULL && packet_io->dumpfile == NULL) - { - goto error_out; + return NULL; } - return packet_io; + if (packet_io_config_load(cfg, toml_file) == -1) + { + packet_io_config_free(cfg); + return NULL; + } -error_out: - packet_io_free(packet_io); - return NULL; + return cfg; } -void packet_io_free(struct packet_io *packet_io) +void packet_io_config_free(struct packet_io_config *cfg) { - if (packet_io) + if (cfg) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) + free(cfg); + cfg = NULL; + } +} + +void packet_io_config_print(const struct packet_io_config *cfg) +{ + if (cfg) + { + PACKET_IO_LOG_INFO("packet_io.mode : %s", cfg->mode == PACKET_IO_PCAPFILE ? "pcapfile" : (cfg->mode == PACKET_IO_PCAPLIST ? "pcaplist" : "marsio")); + if (cfg->mode == PACKET_IO_PCAPFILE || cfg->mode == PACKET_IO_PCAPLIST) { - marsio_io_free(packet_io->marsio); + PACKET_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path); } else { - dumpfile_io_free(packet_io->dumpfile); + PACKET_IO_LOG_INFO("packet_io.app_symbol : %s", cfg->app_symbol); + PACKET_IO_LOG_INFO("packet_io.dev_symbol : %s", cfg->dev_symbol); } - free(packet_io); - packet_io = NULL; + PACKET_IO_LOG_INFO("packet_io.nr_worker_thread : %d", cfg->nr_worker_thread); + for (uint16_t i = 0; i < cfg->nr_worker_thread; i++) + { + PACKET_IO_LOG_INFO("packet_io.cpu_mask[%03d] : %d", i, cfg->cpu_mask[i]); + } + PACKET_IO_LOG_INFO("packet_io.idle_yield_interval_ms : %lu", cfg->idle_yield_interval_ms); } } -int packet_io_isbreak(struct packet_io *packet_io) // used for dumpfile mode +struct packet_io *packet_io_new(const struct packet_io_config *cfg) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) + struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); + if (pkt_io == NULL) { - return 0; + return NULL; + } + + if (cfg->mode == PACKET_IO_MARSIO) + { + pkt_io->new_func = marsio_io_new; + pkt_io->free_func = marsio_io_free; + pkt_io->isbreak_func = marsio_io_isbreak; + pkt_io->init_func = marsio_io_init; + pkt_io->input_func = marsio_io_input; + pkt_io->output_func = marsio_io_output; + pkt_io->drop_func = marsio_io_drop; + pkt_io->inject_func = marsio_io_inject; + pkt_io->yield_func = marsio_io_yield; + pkt_io->stat_func = marsio_io_stat; } else { - return dumpfile_io_isbreak(packet_io->dumpfile); + pkt_io->new_func = pcap_io_new; + pkt_io->free_func = pcap_io_free; + pkt_io->isbreak_func = pcap_io_isbreak; + pkt_io->init_func = pcap_io_init; + pkt_io->input_func = pcap_io_input; + pkt_io->output_func = pcap_io_output; + pkt_io->drop_func = pcap_io_drop; + pkt_io->inject_func = pcap_io_inject; + pkt_io->yield_func = pcap_io_yield; + pkt_io->stat_func = pcap_io_stat; + } + + pkt_io->handle = pkt_io->new_func(cfg); + if (pkt_io->handle == NULL) + { + packet_io_free(pkt_io); + return NULL; + } + + return pkt_io; +} + +void packet_io_free(struct packet_io *pkt_io) +{ + if (pkt_io) + { + if (pkt_io->handle) + { + pkt_io->free_func(pkt_io->handle); + } + free(pkt_io); + pkt_io = NULL; } } -int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx) +int packet_io_isbreak(struct packet_io *pkt_io) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - return marsio_io_init(packet_io->marsio, thr_idx); - } - else - { - return dumpfile_io_init(packet_io->dumpfile, thr_idx); - } + return pkt_io->isbreak_func(pkt_io->handle); } -uint16_t packet_io_input(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - return marsio_io_input(packet_io->marsio, thr_idx, pkts, nr_pkts); - } - else - { - return dumpfile_io_input(packet_io->dumpfile, thr_idx, pkts, nr_pkts); - } + return pkt_io->init_func(pkt_io->handle, thr_idx); } -void packet_io_output(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +uint16_t packet_io_input(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - marsio_io_output(packet_io->marsio, thr_idx, pkts, nr_pkts); - } - else - { - dumpfile_io_output(packet_io->dumpfile, thr_idx, pkts, nr_pkts); - } + return pkt_io->input_func(pkt_io->handle, thr_idx, pkts, nr_pkts); } -void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void packet_io_output(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - marsio_io_drop(packet_io->marsio, thr_idx, pkts, nr_pkts); - } - else - { - dumpfile_io_drop(packet_io->dumpfile, thr_idx, pkts, nr_pkts); - } + pkt_io->output_func(pkt_io->handle, thr_idx, pkts, nr_pkts); } -uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - return marsio_io_inject(packet_io->marsio, thr_idx, pkts, nr_pkts); - } - else - { - return dumpfile_io_inject(packet_io->dumpfile, thr_idx, pkts, nr_pkts); - } + pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts); } -void packet_io_yield(struct packet_io *packet_io, uint16_t thr_idx, uint64_t timeout_ms) +uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - marsio_io_yield(packet_io->marsio, thr_idx, timeout_ms); - } - else - { - dumpfile_io_yield(packet_io->dumpfile, thr_idx, timeout_ms); - } + return pkt_io->inject_func(pkt_io->handle, thr_idx, pkts, nr_pkts); } -struct packet_io_stat *packet_io_stat(struct packet_io *packet_io, uint16_t thr_idx) +void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) { - if (likely(packet_io->mode == PACKET_IO_MARSIO)) - { - return marsio_io_stat(packet_io->marsio, thr_idx); - } - else - { - return dumpfile_io_stat(packet_io->dumpfile, thr_idx); - } + pkt_io->yield_func(pkt_io->handle, thr_idx); +} + +struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx) +{ + return pkt_io->stat_func(pkt_io->handle, thr_idx); } diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h index 733b6c3..985ab39 100644 --- a/infra/packet_io/packet_io.h +++ b/infra/packet_io/packet_io.h @@ -48,39 +48,39 @@ struct __attribute__((aligned(64))) packet_io_stat enum packet_io_mode { - PACKET_IO_DUMPFILE = 0, - PACKET_IO_DUMPFILELIST = 1, + PACKET_IO_PCAPFILE = 0, + PACKET_IO_PCAPLIST = 1, PACKET_IO_MARSIO = 2, }; -struct packet_io_options +struct packet_io_config { enum packet_io_mode mode; - - // for dumpfile - char dumpfile_path[PATH_MAX]; - - // for marsio + char pcap_path[PATH_MAX]; char app_symbol[64]; char dev_symbol[64]; - - uint16_t nr_worker_thread; + uint16_t nr_worker_thread; // range [1, MAX_THREAD_NUM] uint16_t cpu_mask[MAX_THREAD_NUM]; + uint64_t idle_yield_interval_ms; // range: [0, 6000] (ms) }; +struct packet_io_config *packet_io_config_new(const char *toml_file); +void packet_io_config_free(struct packet_io_config *cfg); +void packet_io_config_print(const struct packet_io_config *cfg); + struct packet; struct packet_io; -struct packet_io *packet_io_new(struct packet_io_options *opts); -void packet_io_free(struct packet_io *packet_io); -int packet_io_isbreak(struct packet_io *packet_io); // used for dumpfile mode +struct packet_io *packet_io_new(const struct packet_io_config *cfg); +void packet_io_free(struct packet_io *pkt_io); +int packet_io_isbreak(struct packet_io *pkt_io); -int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx); -uint16_t packet_io_input(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void packet_io_output(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -void packet_io_yield(struct packet_io *packet_io, uint16_t thr_idx, uint64_t timeout_ms); -struct packet_io_stat *packet_io_stat(struct packet_io *packet_io, uint16_t thr_idx); +int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx); +uint16_t packet_io_input(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void packet_io_output(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx); +struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx); #ifdef __cplusplus } diff --git a/infra/packet_io/dumpfile_io.c b/infra/packet_io/pcap_io.c similarity index 65% rename from infra/packet_io/dumpfile_io.c rename to infra/packet_io/pcap_io.c index ed8fb58..1bf37a0 100644 --- a/infra/packet_io/dumpfile_io.c +++ b/infra/packet_io/pcap_io.c @@ -12,21 +12,19 @@ #include "tuple.h" #include "utils.h" #include "log_private.h" -#include "dumpfile_io.h" +#include "pcap_io.h" #include "packet_dump.h" #include "packet_parser.h" #include "packet_private.h" -#define PACKET_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__) -#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__) +#define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) +#define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) #define MAX_PACKET_QUEUE_SIZE (4096 * 1000) -struct dumpfile_io +struct pcap_io { - enum packet_io_mode mode; - uint16_t nr_worker_thread; - char dumpfile_path[256]; + struct packet_io_config cfg; pcap_t *pcap; struct logger *logger; @@ -64,14 +62,14 @@ static struct packet_queue *packet_queue_new(uint32_t size) struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue)); if (queue == NULL) { - PACKET_IO_LOG_ERROR("unable to new packet queue"); + PCAP_IO_LOG_ERROR("unable to new packet queue"); return NULL; } queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); if (queue->queue == NULL) { - PACKET_IO_LOG_ERROR("unable to new packet queue"); + PCAP_IO_LOG_ERROR("unable to new packet queue"); free(queue); return NULL; } @@ -103,7 +101,7 @@ static int packet_queue_push(struct packet_queue *queue, void *data) { if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) { - PACKET_IO_LOG_ERROR("packet queue is full, retry later"); + PCAP_IO_LOG_ERROR("packet queue is full, retry later"); return -1; } @@ -130,13 +128,13 @@ static void packet_queue_pop(struct packet_queue *queue, void **data) static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { - struct dumpfile_io *handle = (struct dumpfile_io *)user; + struct pcap_io *handle = (struct pcap_io *)user; // copy packet data to new memory struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { - PACKET_IO_LOG_ERROR("unable to alloc packet"); + PCAP_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); @@ -152,13 +150,13 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_ uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); // push packet to queue - struct packet_queue *queue = handle->queue[hash % handle->nr_worker_thread]; + struct packet_queue *queue = handle->queue[hash % handle->cfg.nr_worker_thread]; while (packet_queue_push(queue, pcap_pkt) == -1) { if (ATOMIC_READ(&handle->io_thread_need_exit)) { free(pcap_pkt); - PACKET_IO_LOG_FATAL("dumpfile io thread need exit"); + PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(handle->pcap); break; } @@ -167,39 +165,39 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_ if (ATOMIC_READ(&handle->io_thread_need_exit)) { - PACKET_IO_LOG_FATAL("dumpfile io thread need exit"); + PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(handle->pcap); } } -static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file) +static int pcap_io_handler(struct pcap_io *handle, const char *pcap_file) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); - PACKET_IO_LOG_FATAL("dumpfile %s in-processing", resolved_path) + PCAP_IO_LOG_FATAL("pcap %s in-processing", resolved_path) handle->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (handle->pcap == NULL) { - PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); + PCAP_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } handle->read_pcap_files++; pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle); pcap_close(handle->pcap); - PACKET_IO_LOG_FATAL("dumpfile %s processed", resolved_path) + PCAP_IO_LOG_FATAL("pcap %s processed", resolved_path) return 0; } -static int all_packet_consumed(struct dumpfile_io *handle) +static int all_packet_consumed(struct pcap_io *handle) { uint64_t consumed_pkts = 0; uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts); - for (uint16_t i = 0; i < handle->nr_worker_thread; i++) + for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++) { consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx); } @@ -213,32 +211,32 @@ static int all_packet_consumed(struct dumpfile_io *handle) } } -static void *dumpfile_thread(void *arg) +static void *pcap_io_thread(void *arg) { - struct dumpfile_io *handle = (struct dumpfile_io *)arg; + struct pcap_io *handle = (struct pcap_io *)arg; __thread_local_logger = handle->logger; ATOMIC_SET(&handle->io_thread_is_runing, 1); - PACKET_IO_LOG_FATAL("dumpfile io thread is running"); + PCAP_IO_LOG_FATAL("pcap io thread is running"); - if (handle->mode == PACKET_IO_DUMPFILE) + if (handle->cfg.mode == PACKET_IO_PCAPFILE) { - dumpfile_handler(handle, handle->dumpfile_path); + pcap_io_handler(handle, handle->cfg.pcap_path); } - else // PACKET_IO_DUMPFILELIST + else // PACKET_IO_PCAPLIST { FILE *fp = NULL; - if (strcmp(handle->dumpfile_path, "-") == 0) + if (strcmp(handle->cfg.pcap_path, "-") == 0) { - PACKET_IO_LOG_ERROR("dumpfile list is empty, read from stdin"); + PCAP_IO_LOG_ERROR("pcap path is empty, read from stdin"); fp = stdin; } else { - fp = fopen(handle->dumpfile_path, "r"); + fp = fopen(handle->cfg.pcap_path, "r"); if (fp == NULL) { - PACKET_IO_LOG_ERROR("unable to open dumpfile list: %s", handle->dumpfile_path); + PCAP_IO_LOG_ERROR("unable to open pcap path: %s", handle->cfg.pcap_path); goto erro_out; } } @@ -257,14 +255,14 @@ static void *dumpfile_thread(void *arg) *pos = '\0'; } - dumpfile_handler(handle, line); + pcap_io_handler(handle, line); } if (fp != stdin) { fclose(fp); } } - PACKET_IO_LOG_FATAL("dumpfile io thread read all pcap files"); + PCAP_IO_LOG_FATAL("pcap io thread read all pcap files"); erro_out: while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) @@ -277,7 +275,7 @@ erro_out: usleep(1000); // 1ms } - PACKET_IO_LOG_FATAL("dumpfile io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts)); + PCAP_IO_LOG_FATAL("pcap io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts)); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; @@ -287,60 +285,59 @@ erro_out: * Public API ******************************************************************************/ -struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_worker_thread) +void *pcap_io_new(const struct packet_io_config *cfg) { pthread_t tid; - struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io)); + struct pcap_io *handle = (struct pcap_io *)calloc(1, sizeof(struct pcap_io)); if (handle == NULL) { - PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io"); + PCAP_IO_LOG_ERROR("unable to allocate memory for pcap_io"); return NULL; } - handle->mode = mode; - handle->nr_worker_thread = nr_worker_thread; handle->logger = __thread_local_logger; - strncpy(handle->dumpfile_path, dumpfile_path, MIN(strlen(dumpfile_path), sizeof(handle->dumpfile_path))); + memcpy(&handle->cfg, cfg, sizeof(struct packet_io_config)); - for (uint16_t i = 0; i < handle->nr_worker_thread; i++) + for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++) { handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { - PACKET_IO_LOG_ERROR("unable to create packet queue"); + PCAP_IO_LOG_ERROR("unable to create packet queue"); goto error_out; } } - if (pthread_create(&tid, NULL, dumpfile_thread, (void *)handle) != 0) + if (pthread_create(&tid, NULL, pcap_io_thread, (void *)handle) != 0) { - PACKET_IO_LOG_ERROR("unable to create packet io thread"); + PCAP_IO_LOG_ERROR("unable to create pcap io thread"); goto error_out; } return handle; error_out: - dumpfile_io_free(handle); + pcap_io_free(handle); return NULL; } -void dumpfile_io_free(struct dumpfile_io *handle) +void pcap_io_free(void *handle) { - if (handle) + struct pcap_io *pcap_io = (struct pcap_io *)handle; + if (pcap_io) { - ATOMIC_SET(&handle->io_thread_need_exit, 1); + ATOMIC_SET(&pcap_io->io_thread_need_exit, 1); - while (ATOMIC_READ(&handle->io_thread_is_runing)) + while (ATOMIC_READ(&pcap_io->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap_pkt = NULL; - for (uint16_t i = 0; i < handle->nr_worker_thread; i++) + for (uint16_t i = 0; i < pcap_io->cfg.nr_worker_thread; i++) { while (1) { - packet_queue_pop(handle->queue[i], (void **)&pcap_pkt); + packet_queue_pop(pcap_io->queue[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); @@ -351,30 +348,33 @@ void dumpfile_io_free(struct dumpfile_io *handle) } } - packet_queue_free(handle->queue[i]); + packet_queue_free(pcap_io->queue[i]); } - free(handle); - handle = NULL; + free(pcap_io); + pcap_io = NULL; } } -int dumpfile_io_isbreak(struct dumpfile_io *handle) +int pcap_io_isbreak(void *handle) { - return ATOMIC_READ(&handle->io_thread_wait_exit); + struct pcap_io *pcap_io = (struct pcap_io *)handle; + + return ATOMIC_READ(&pcap_io->io_thread_wait_exit); } -int dumpfile_io_init(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) +int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return 0; } -uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +uint16_t pcap_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - struct packet_queue *queue = handle->queue[thr_idx]; - struct packet_io_stat *stat = &handle->stat[thr_idx]; + uint16_t nr_packet_parsed = 0; + struct packet *pkt = NULL; struct pcap_pkt *pcap_pkt = NULL; - struct packet *pkt; - uint16_t nr_parsed = 0; + struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_queue *queue = pcap_io->queue[thr_idx]; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { @@ -391,24 +391,25 @@ uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct stat->raw_pkts_rx++; stat->raw_bytes_rx += pcap_pkt->len; - pkt = &pkts[nr_parsed]; + pkt = &pkts[nr_packet_parsed]; packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); memset(&pkt->meta, 0, sizeof(pkt->meta)); packet_set_origin_ctx(pkt, pcap_pkt); packet_set_action(pkt, PACKET_ACTION_FORWARD); packet_set_timeval(pkt, &pcap_pkt->ts); - nr_parsed++; + nr_packet_parsed++; } } - return nr_parsed; + return nr_packet_parsed; } -void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void pcap_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; struct packet *pkt = NULL; - struct packet_io_stat *stat = &handle->stat[thr_idx]; + struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { @@ -430,10 +431,11 @@ void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct pac } } -void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt = NULL; - struct packet_io_stat *stat = &handle->stat[thr_idx]; + struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { @@ -449,14 +451,15 @@ void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packe } } -uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) +uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { uint16_t len; - struct packet *pkt = NULL; - struct packet_io_stat *stat = &handle->stat[thr_idx]; struct tuple6 tuple; + struct packet *pkt = NULL; + struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; - char file[1024] = {0}; + char file[PATH_MAX] = {0}; char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; @@ -491,11 +494,11 @@ uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct if (packet_dump_pcap(pkt, file) == -1) { - PACKET_IO_LOG_ERROR("unable to dump pcap file: %s", file); + PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); } else { - PACKET_IO_LOG_FATAL("dump inject packet: %s", file); + PCAP_IO_LOG_FATAL("dump inject packet: %s", file); } packet_free(pkt); } @@ -503,12 +506,14 @@ uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct return nr_pkts; } -void dumpfile_io_yield(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)), uint64_t timeout_ms __attribute__((unused))) +void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return; } -struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx) +struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx) { - return &handle->stat[thr_idx]; + struct pcap_io *pcap_io = (struct pcap_io *)handle; + + return &pcap_io->stat[thr_idx]; } \ No newline at end of file diff --git a/infra/packet_io/pcap_io.h b/infra/packet_io/pcap_io.h new file mode 100644 index 0000000..7c690c4 --- /dev/null +++ b/infra/packet_io/pcap_io.h @@ -0,0 +1,24 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "packet_io.h" + +void *pcap_io_new(const struct packet_io_config *cfg); +void pcap_io_free(void *handle); +int pcap_io_isbreak(void *handle); + +int pcap_io_init(void *handle, uint16_t thr_idx); +uint16_t pcap_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void pcap_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void pcap_io_yield(void *handle, uint16_t thr_idx); +struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx); + +#ifdef __cplusplus +} +#endif diff --git a/test/decoders/http/test_based_on_stellar/env/stellar.toml b/test/decoders/http/test_based_on_stellar/env/stellar.toml index 210ce3d..b472a69 100644 --- a/test/decoders/http/test_based_on_stellar/env/stellar.toml +++ b/test/decoders/http/test_based_on_stellar/env/stellar.toml @@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31] snowflake_offset = 2 # [0, 127] [packet_io] -mode = "dumpfile" # dumpfile, dumpfilelist, marsio +mode = "pcapfile" # pcapfile, pcaplist, marsio app_symbol = "stellar" dev_symbol = "nf_0_fw" - -dumpfile_path = "./pcap/test.pcap" -#dumpfile_path = "/tmp/dumpfile/dumpfilelist" - -nr_worker_thread = 1 # [1, 256] +pcap_path = "./pcap/test.pcap" +nr_worker_thread = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] +idle_yield_interval_ms = 900 # range: [0, 60000] (ms) [ip_reassembly] enable = 1 @@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096] [schedule] merge_stat_interval = 50 # range: [1, 60000] (ms) output_stat_interval = 10 # range: [1, 60000] (ms) - -packet_io_yield_interval = 90 # range: [1, 60000] (ms) diff --git a/test/lpi_plugin/CMakeLists.txt b/test/lpi_plugin/CMakeLists.txt index 116ad8f..813451f 100644 --- a/test/lpi_plugin/CMakeLists.txt +++ b/test/lpi_plugin/CMakeLists.txt @@ -23,8 +23,8 @@ add_test(NAME ${TEST_NAME}.SETUP COMMAND sh -c " cp ${CMAKE_CURRENT_SOURCE_DIR}/test_config/spec.toml ${CMAKE_CURRENT_BINARY_DIR}/plugin/ && cp ${CMAKE_SOURCE_DIR}/conf/log.toml ${CMAKE_CURRENT_BINARY_DIR}/conf/ && cp ${CMAKE_CURRENT_SOURCE_DIR}/test_config/tsg_l7_protocol.conf ${CMAKE_CURRENT_BINARY_DIR}/tsgconf/ && - tomlq -t -i '.packet_io.dumpfile_path=\"-\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml && - tomlq -t -i '.packet_io.mode=\"dumpfilelist\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml + tomlq -t -i '.packet_io.pcap_path=\"-\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml && + tomlq -t -i '.packet_io.mode=\"pcaplist\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml ") diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml index d039d0d..4791358 100644 --- a/test/packet_inject/conf/stellar.toml +++ b/test/packet_inject/conf/stellar.toml @@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31] snowflake_offset = 2 # [0, 127] [packet_io] -mode = "dumpfile" # dumpfile, dumpfilelist, marsio +mode = "pcapfile" # pcapfile, pcaplist, marsio app_symbol = "stellar" dev_symbol = "nf_0_fw" - -dumpfile_path = "/tmp/dumpfile/dumpfile.pcap" -#dumpfile_path = "/tmp/dumpfile/dumpfilelist" - -nr_worker_thread = 1 # [1, 256] +pcap_path = "/tmp/test.pcap" +nr_worker_thread = 1 # range: [1, 256] cpu_mask = [5] +idle_yield_interval_ms = 900 # range: [0, 60000] (ms) [ip_reassembly] enable = 1 @@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 128 # range: [2, 4096] [schedule] merge_stat_interval = 50 # range: [1, 60000] (ms) output_stat_interval = 2000 # range: [1, 60000] (ms) - -packet_io_yield_interval = 900 # range: [1, 60000] (ms) diff --git a/test/packet_inject/packet_inject_test.h b/test/packet_inject/packet_inject_test.h index 3728840..dc8bf79 100644 --- a/test/packet_inject/packet_inject_test.h +++ b/test/packet_inject/packet_inject_test.h @@ -145,8 +145,8 @@ static inline void expect_cmp_inject(const char *expect_pcap_file, const char *i static inline void packet_inject_test(struct packet_inject_case *test) { // create directory - char dumpfile_path[PATH_MAX] = {0}; - snprintf(dumpfile_path, sizeof(dumpfile_path), "%s/input/%s", test->work_dir, test->input_pcap); + char pcap_path[PATH_MAX] = {0}; + snprintf(pcap_path, sizeof(pcap_path), "%s/input/%s", test->work_dir, test->input_pcap); system_cmd("rm -rf %s", test->work_dir); system_cmd("mkdir -p %s/input/", test->work_dir); system_cmd("mkdir -p %s/log/", test->work_dir); @@ -173,9 +173,9 @@ static inline void packet_inject_test(struct packet_inject_case *test) char temp[PATH_MAX * 2] = {0}; getcwd(cwd, sizeof(cwd)); chdir(test->work_dir); - snprintf(temp, sizeof(temp), "dumpfile_path = \"%s\"", dumpfile_path); - EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = dumpfile") == 0); - EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "dumpfile_path = \"/tmp/dumpfile/dumpfile.pcap\"", temp) == 0); + snprintf(temp, sizeof(temp), "pcap_path = \"%s\"", pcap_path); + EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = pcapfile") == 0); + EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "pcap_path = \"/tmp/test.pcap\"", temp) == 0); const char *stellar_cfg_file = "./conf/stellar.toml"; const char *plugin_cfg_file = "./plugin/spec.toml";