diff --git a/conf/stellar.toml b/conf/stellar.toml index 9454cb6..808de63 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -6,6 +6,7 @@ app_symbol = "stellar" dev_symbol = "nf_0_fw" pcap_path = "/tmp/test.pcap" + pcap_done_exit = 1 # range: [0, 1] thread_num = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] idle_yield_ms = 900 # range: [0, 60000] (ms) diff --git a/infra/ip_reassembly/ip_reassembly.c b/infra/ip_reassembly/ip_reassembly.c index dd4e749..e7224c8 100644 --- a/infra/ip_reassembly/ip_reassembly.c +++ b/infra/ip_reassembly/ip_reassembly.c @@ -752,3 +752,15 @@ void ip_reassembly_print_stat(struct ip_reassembly *ip_reass) IP_REASSEMBLY_LOG_INFO("ip_reass: %p, ip6_frags_too_many : %lu", ip_reass, ip_reass->stat.ip6_frags_too_many); } } + +uint64_t ip_reassembly_stat_get(struct ip_reassembly_stat *stat, enum ip_reass_stat_type type) +{ + switch (type) + { +#define XX(_type, _name) case _type: return stat->_name; + IP_REASS_STAT_MAP(XX) +#undef XX + default: + return 0; + } +} \ No newline at end of file diff --git a/infra/ip_reassembly/ip_reassembly.h b/infra/ip_reassembly/ip_reassembly.h index 572ede8..25ecb6a 100644 --- a/infra/ip_reassembly/ip_reassembly.h +++ b/infra/ip_reassembly/ip_reassembly.h @@ -38,6 +38,45 @@ struct ip_reassembly_stat uint64_t ip6_frags_too_many; } __attribute__((aligned(64))); +#define IP_REASS_STAT_MAP(XX) \ + XX(IP_REASS_STAT_IP4_DEFRAGS_EXPECTED, ip4_defrags_expected) \ + XX(IP_REASS_STAT_IP4_DEFRAGS_SUCCEED, ip4_defrags_succeed) \ + XX(IP_REASS_STAT_IP4_DEFRAGS_FAILED, ip4_defrags_failed) \ + XX(IP_REASS_STAT_IP4_FRAGS, ip4_frags) \ + XX(IP_REASS_STAT_IP4_FRAGS_FREED, ip4_frags_freed) \ + XX(IP_REASS_STAT_IP4_FRAGS_BUFFERED, ip4_frags_buffered) \ + XX(IP_REASS_STAT_IP4_FRAGS_NO_BUFFER, ip4_frags_no_buffer) \ + XX(IP_REASS_STAT_IP4_FRAGS_TIMEOUT, ip4_frags_timeout) \ + XX(IP_REASS_STAT_IP4_FRAGS_INVALID_LENGTH, ip4_frags_invalid_length) \ + XX(IP_REASS_STAT_IP4_FRAGS_OVERLAP, ip4_frags_overlap) \ + XX(IP_REASS_STAT_IP4_FRAGS_TOO_MANY, ip4_frags_too_many) \ + XX(IP_REASS_STAT_IP6_DEFRAGS_EXPECTED, ip6_defrags_expected) \ + XX(IP_REASS_STAT_IP6_DEFRAGS_SUCCEED, ip6_defrags_succeed) \ + XX(IP_REASS_STAT_IP6_DEFRAGS_FAILED, ip6_defrags_failed) \ + XX(IP_REASS_STAT_IP6_FRAGS, ip6_frags) \ + XX(IP_REASS_STAT_IP6_FRAGS_FREED, ip6_frags_freed) \ + XX(IP_REASS_STAT_IP6_FRAGS_BUFFERED, ip6_frags_buffered) \ + XX(IP_REASS_STAT_IP6_FRAGS_NO_BUFFER, ip6_frags_no_buffer) \ + XX(IP_REASS_STAT_IP6_FRRAGS_TIMEOUT, ip6_frags_timeout) \ + XX(IP_REASS_STAT_IP6_FRAGS_INVALID_LENGTH, ip6_frags_invalid_length) \ + XX(IP_REASS_STAT_IP6_FRAGS_OVERLAP, ip6_frags_overlap) \ + XX(IP_REASS_STAT_IP6_FRAGS_TOO_MANY, ip6_frags_too_many) + +enum ip_reass_stat_type +{ +#define XX(type, name) type, + IP_REASS_STAT_MAP(XX) +#undef XX + IP_REASS_STAT_MAX +}; + +__attribute__((unused)) static const char ip_reass_stat_str[IP_REASS_STAT_MAX][64] = +{ +#define XX(type, name) #name, + IP_REASS_STAT_MAP(XX) +#undef XX +}; + struct ip_reassembly *ip_reassembly_new(uint64_t timeout_ms, uint64_t frag_queue_num, uint64_t frag_queue_size); void ip_reassembly_free(struct ip_reassembly *ip_reass); @@ -47,6 +86,8 @@ struct packet *ip_reassembly_clean(struct ip_reassembly *ip_reass, uint64_t now_ struct ip_reassembly_stat *ip_reassembly_get_stat(struct ip_reassembly *ip_reass); void ip_reassembly_print_stat(struct ip_reassembly *ip_reass); +uint64_t ip_reassembly_stat_get(struct ip_reassembly_stat *stat, enum ip_reass_stat_type type); + #ifdef __cplusplus } #endif diff --git a/infra/packet_io/CMakeLists.txt b/infra/packet_io/CMakeLists.txt index dd094f1..0164184 100644 --- a/infra/packet_io/CMakeLists.txt +++ b/infra/packet_io/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(packet_io pcap_io.c mars_io.c packet_io.c) target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR}) -target_link_libraries(packet_io marsio pcap packet_manager ip_reassembly) +target_link_libraries(packet_io marsio pcap packet_manager ip_reassembly fieldstat4) add_subdirectory(test) \ No newline at end of file diff --git a/infra/packet_io/mars_io.c b/infra/packet_io/mars_io.c index 7dd2904..5f3a0d9 100644 --- a/infra/packet_io/mars_io.c +++ b/infra/packet_io/mars_io.c @@ -5,30 +5,19 @@ #include "mars_io.h" #include "packet_pool.h" #include "packet_parser.h" -#include "ip_reassembly.h" -#include "log_internal.h" #include "utils_internal.h" #include "packet_internal.h" -#define MARS_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) -#define MARS_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) - struct mars_io_cfg { char app_symbol[64]; char dev_symbol[64]; - uint16_t thread_num; // range [1, MAX_THREAD_NUM] + uint64_t thread_num; // range [1, MAX_THREAD_NUM] uint64_t cpu_mask[MAX_THREAD_NUM]; uint64_t idle_yield_ms; // range: [0, 6000] (ms) // packet pool uint64_t capacity; // range: [1, 4294967295] - - // ip reassembly - uint64_t fail_action; // 0: bypass, 1: drop - uint64_t timeout_ms; // range: [1, 60000] (ms) - uint64_t frag_queue_num; // range: [1, 4294967295 - uint64_t frag_queue_size; // range: [2, 65535] }; struct mars_io @@ -40,7 +29,6 @@ struct mars_io struct mr_sendpath *mr_path; struct packet_pool *pool[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; - struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; }; /****************************************************************************** @@ -59,15 +47,10 @@ static struct mars_io_cfg *mars_io_cfg_new(const char *toml_file) int num = 0; ret += load_toml_str_config(toml_file, "packet_io.app_symbol", cfg->app_symbol); ret += load_toml_str_config(toml_file, "packet_io.dev_symbol", cfg->dev_symbol); - ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.idle_yield_ms", &cfg->idle_yield_ms, 0, 60000); num = load_toml_array_config(toml_file, "packet_io.cpu_mask", cfg->cpu_mask, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); - if (ret != 0 || num != (int)cfg->thread_num) { free(cfg); @@ -92,24 +75,19 @@ static void mars_io_cfg_print(const struct mars_io_cfg *cfg) { if (cfg) { - MARS_IO_LOG_INFO("packet_io.app_symbol : %s", cfg->app_symbol); - MARS_IO_LOG_INFO("packet_io.dev_symbol : %s", cfg->dev_symbol); - MARS_IO_LOG_INFO("packet_io.idle_yield_ms : %lu", cfg->idle_yield_ms); - MARS_IO_LOG_INFO("packet_io.thread_num : %lu", cfg->thread_num); + PACKET_IO_LOG_INFO("packet_io.app_symbol : %s", cfg->app_symbol); + PACKET_IO_LOG_INFO("packet_io.dev_symbol : %s", cfg->dev_symbol); + PACKET_IO_LOG_INFO("packet_io.idle_yield_ms : %lu", cfg->idle_yield_ms); for (uint64_t i = 0; i < cfg->thread_num; i++) { - MARS_IO_LOG_INFO("packet_io.cpu_mask[%03d] : %d", i, cfg->cpu_mask[i]); + PACKET_IO_LOG_INFO("packet_io.cpu_mask[%03d] : %d", i, cfg->cpu_mask[i]); } - MARS_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); - MARS_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); - MARS_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); - MARS_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); - MARS_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); + PACKET_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); } } -static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) +static void copy_metadata_to_packet(marsio_buff_t *mbuff, struct packet *pkt) { struct route_ctx route_ctx = {}; route_ctx.used = marsio_buff_get_metadata(mbuff, MR_BUFF_ROUTE_CTX, &route_ctx.data, sizeof(route_ctx.data)); @@ -119,7 +97,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get route ctx"); + PACKET_IO_LOG_ERROR("failed to get route ctx"); } struct sids sids = {}; @@ -130,7 +108,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get sids"); + PACKET_IO_LOG_ERROR("failed to get sids"); } uint64_t session_id = 0; @@ -140,7 +118,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get session id"); + PACKET_IO_LOG_ERROR("failed to get session id"); } // TODO @@ -151,7 +129,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get domain id"); + PACKET_IO_LOG_ERROR("failed to get domain id"); } #endif @@ -162,7 +140,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get link id"); + PACKET_IO_LOG_ERROR("failed to get link id"); } packet_set_ctrl(pkt, marsio_buff_is_ctrlbuf(mbuff)); @@ -174,7 +152,7 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) } else { - MARS_IO_LOG_ERROR("failed to get direction"); + PACKET_IO_LOG_ERROR("failed to get direction"); } packet_set_action(pkt, PACKET_ACTION_FORWARD); @@ -184,24 +162,24 @@ static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) packet_set_timeval(pkt, &tv); } -static void mbuff_set_metadata(marsio_buff_t *mbuff, struct packet *pkt) +static void copy_metadata_to_mbuff(struct packet *pkt, marsio_buff_t *mbuff) { const struct route_ctx *route_ctx = packet_get_route_ctx(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, (void *)route_ctx->data, route_ctx->used) != 0) { - MARS_IO_LOG_ERROR("failed to set route ctx"); + PACKET_IO_LOG_ERROR("failed to set route ctx"); } const struct sids *sids = packet_get_sids(pkt); if (marsio_buff_set_sid_list(mbuff, (sid_t *)sids->sid, sids->used) != 0) { - MARS_IO_LOG_ERROR("failed to set sids"); + PACKET_IO_LOG_ERROR("failed to set sids"); } uint64_t session_id = packet_get_session_id(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) != 0) { - MARS_IO_LOG_ERROR("failed to set session id"); + PACKET_IO_LOG_ERROR("failed to set session id"); } // TODO @@ -209,14 +187,14 @@ static void mbuff_set_metadata(marsio_buff_t *mbuff, struct packet *pkt) uint64_t domain = packet_get_domain(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_DOMAIN, &domain, sizeof(domain)) != 0) { - MARS_IO_LOG_ERROR("failed to set domain id"); + PACKET_IO_LOG_ERROR("failed to set domain id"); } #endif uint16_t link_id = packet_get_link_id(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_LINK_ID, &link_id, sizeof(link_id)) != 0) { - MARS_IO_LOG_ERROR("failed to set link id"); + PACKET_IO_LOG_ERROR("failed to set link id"); } if (packet_is_ctrl(pkt)) @@ -227,7 +205,7 @@ static void mbuff_set_metadata(marsio_buff_t *mbuff, struct packet *pkt) enum packet_direction direction = packet_get_direction(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &direction, sizeof(direction)) != 0) { - MARS_IO_LOG_ERROR("failed to set direction"); + PACKET_IO_LOG_ERROR("failed to set direction"); } } @@ -264,115 +242,6 @@ static void origin_free_cb(struct packet *pkt, void *args) packet_pool_push(pool, pkt); } -static struct packet *recv_packet(struct mars_io *mars_io, marsio_buff_t *mbuff, uint16_t thr_idx) -{ - struct packet_io_stat *stat = &mars_io->stat[thr_idx]; - struct ip_reassembly *ip_reass = mars_io->ip_reass[thr_idx]; - struct packet_pool *pool = mars_io->pool[thr_idx]; - int len = marsio_buff_datalen(mbuff); - char *data = marsio_buff_mtod(mbuff); - - stat->pkts_rx++; - stat->bytes_rx += len; - - if (is_keepalive_packet(data, len)) - { - stat->keep_alive_pkts++; - stat->keep_alive_bytes += len; - - stat->pkts_tx++; - stat->bytes_tx += len; - marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); - return NULL; - } - else - { - struct packet *pkt = packet_pool_pop(pool); - assert(pkt != NULL); - struct packet_origin origin = { - .type = ORIGIN_TYPE_MR, - .ctx = mbuff, - .cb = origin_free_cb, - .args = mars_io, - .thr_idx = thr_idx, - }; - packet_parse(pkt, data, len); - packet_set_metadata(pkt, mbuff); - packet_set_origin(pkt, &origin); - - if (packet_is_fragment(pkt)) - { - return ip_reassembly_defrag(ip_reass, pkt, clock_get_real_time_ms()); - } - else - { - return pkt; - } - } -} - -static void send_packet(struct mars_io *mars_io, struct packet *pkt, uint16_t thr_idx) -{ - marsio_buff_t *mbuff = NULL; - struct packet_io_stat *stat = &mars_io->stat[thr_idx]; - int len = packet_get_raw_len(pkt); - struct packet_origin *origin = packet_get_origin(pkt); - - // TODO check len vs MTU, if len > MTU, fragment it - - if (origin->type == ORIGIN_TYPE_MR) - { - mbuff = (marsio_buff_t *)origin->ctx; - mbuff_set_metadata(mbuff, pkt); - marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); - packet_pool_push(mars_io->pool[thr_idx], pkt); - } - else - { - if (marsio_buff_malloc_global(mars_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) - { - MARS_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); - packet_free(pkt); - return; - } - else - { - stat->pkts_injected++; - stat->bytes_injected += len; - - char *ptr = marsio_buff_append(mbuff, len); - memcpy(ptr, packet_get_raw_data(pkt), len); - mbuff_set_metadata(mbuff, pkt); - marsio_send_burst_with_options(mars_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); - packet_free(pkt); - } - } - - stat->pkts_tx++; - stat->bytes_tx += len; -} - -static void drop_packet(struct mars_io *mars_io, struct packet *pkt, uint16_t thr_idx) -{ - struct packet_io_stat *stat = &mars_io->stat[thr_idx]; - int len = packet_get_raw_len(pkt); - struct packet_origin *origin = packet_get_origin(pkt); - - stat->pkts_dropped++; - stat->bytes_dropped += len; - - if (origin->type == ORIGIN_TYPE_MR) - { - marsio_buff_t *mbuff = (marsio_buff_t *)origin->ctx; - marsio_buff_free(mars_io->mr_ins, &mbuff, 1, 0, thr_idx); - packet_pool_push(mars_io->pool[thr_idx], pkt); - } - else - { - packet_free(pkt); - } -} - /****************************************************************************** * Public API ******************************************************************************/ @@ -386,14 +255,14 @@ void *mars_io_new(const char *toml_file) struct mars_io *mars_io = (struct mars_io *)calloc(1, sizeof(struct mars_io)); if (mars_io == NULL) { - MARS_IO_LOG_ERROR("unable to allocate memory for mars_io"); + PACKET_IO_LOG_ERROR("unable to allocate memory for mars_io"); return NULL; } mars_io->cfg = mars_io_cfg_new(toml_file); if (mars_io->cfg == NULL) { - MARS_IO_LOG_ERROR("unable to create mars_io_cfg"); + PACKET_IO_LOG_ERROR("unable to create mars_io_cfg"); goto error_out; } mars_io_cfg_print(mars_io->cfg); @@ -406,7 +275,7 @@ void *mars_io_new(const char *toml_file) mars_io->mr_ins = marsio_create(); if (mars_io->mr_ins == NULL) { - MARS_IO_LOG_ERROR("unable to create marsio instance"); + PACKET_IO_LOG_ERROR("unable to create marsio instance"); goto error_out; } @@ -414,21 +283,21 @@ void *mars_io_new(const char *toml_file) marsio_option_set(mars_io->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)); if (marsio_init(mars_io->mr_ins, mars_io->cfg->app_symbol) != 0) { - MARS_IO_LOG_ERROR("unable to init marsio instance"); + PACKET_IO_LOG_ERROR("unable to init marsio instance"); goto error_out; } mars_io->mr_dev = marsio_open_device(mars_io->mr_ins, mars_io->cfg->dev_symbol, mars_io->cfg->thread_num, mars_io->cfg->thread_num); if (mars_io->mr_dev == NULL) { - MARS_IO_LOG_ERROR("unable to open marsio device"); + PACKET_IO_LOG_ERROR("unable to open marsio device"); goto error_out; } mars_io->mr_path = marsio_sendpath_create_by_vdev(mars_io->mr_dev); if (mars_io->mr_path == NULL) { - MARS_IO_LOG_ERROR("unable to create marsio sendpath"); + PACKET_IO_LOG_ERROR("unable to create marsio sendpath"); goto error_out; } for (uint64_t i = 0; i < mars_io->cfg->thread_num; i++) @@ -436,13 +305,7 @@ void *mars_io_new(const char *toml_file) mars_io->pool[i] = packet_pool_new(mars_io->cfg->capacity); if (mars_io->pool[i] == NULL) { - MARS_IO_LOG_ERROR("unable to create packet pool"); - goto error_out; - } - mars_io->ip_reass[i] = ip_reassembly_new(mars_io->cfg->timeout_ms, mars_io->cfg->frag_queue_num, mars_io->cfg->frag_queue_size); - if (mars_io->ip_reass[i] == NULL) - { - MARS_IO_LOG_ERROR("unable to create ip reassembly"); + PACKET_IO_LOG_ERROR("unable to create packet pool"); goto error_out; } } @@ -461,7 +324,6 @@ void mars_io_free(void *handle) { for (uint64_t i = 0; i < mars_io->cfg->thread_num; i++) { - ip_reassembly_free(mars_io->ip_reass[i]); packet_pool_free(mars_io->pool[i]); } @@ -489,7 +351,7 @@ void mars_io_free(void *handle) } } -int mars_io_isbreak(void *handle __attribute__((unused))) +int mars_io_is_done(void *handle __attribute__((unused))) { return 0; } @@ -499,7 +361,7 @@ int mars_io_init(void *handle, uint16_t thr_idx __attribute__((unused))) struct mars_io *mars_io = (struct mars_io *)handle; if (marsio_thread_init(mars_io->mr_ins) != 0) { - MARS_IO_LOG_ERROR("unable to init marsio thread"); + PACKET_IO_LOG_ERROR("unable to init marsio thread"); return -1; } else @@ -510,19 +372,51 @@ int mars_io_init(void *handle, uint16_t thr_idx __attribute__((unused))) int mars_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { + int len = 0; + char *data = NULL; struct packet *pkt = NULL; marsio_buff_t *mbuff = NULL; marsio_buff_t *mbuffs[RX_BURST_MAX]; struct mars_io *mars_io = (struct mars_io *)handle; + struct packet_pool *pool = mars_io->pool[thr_idx]; + struct packet_io_stat *stat = &mars_io->stat[thr_idx]; int ret = 0; int nr_recv = marsio_recv_burst(mars_io->mr_dev, thr_idx, mbuffs, MIN(RX_BURST_MAX, nr_pkts)); for (int i = 0; i < nr_recv; i++) { mbuff = mbuffs[i]; - pkt = recv_packet(mars_io, mbuff, thr_idx); - if (pkt) + + len = marsio_buff_datalen(mbuff); + data = marsio_buff_mtod(mbuff); + + stat->pkts_rx++; + stat->bytes_rx += len; + + if (is_keepalive_packet(data, len)) { + stat->keep_alive_pkts++; + stat->keep_alive_bytes += len; + + stat->pkts_tx++; + stat->bytes_tx += len; + marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); + } + else + { + pkt = packet_pool_pop(pool); + assert(pkt != NULL); + struct packet_origin origin = { + .type = ORIGIN_TYPE_MR, + .ctx = mbuff, + .cb = origin_free_cb, + .args = mars_io, + .thr_idx = thr_idx, + }; + packet_parse(pkt, data, len); + copy_metadata_to_packet(mbuff, pkt); + packet_set_origin(pkt, &origin); + pkts[ret++] = pkt; } } @@ -532,52 +426,84 @@ int mars_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_p void mars_io_send(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { - struct packet *frag = NULL; + int len = 0; struct packet *pkt = NULL; + marsio_buff_t *mbuff = NULL; + struct packet_origin *origin = NULL; struct mars_io *mars_io = (struct mars_io *)handle; + struct packet_io_stat *stat = &mars_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; - if (packet_is_defraged(pkt)) + len = packet_get_raw_len(pkt); + origin = packet_get_origin(pkt); + + if (origin->type == ORIGIN_TYPE_MR) { - while ((frag = packet_pop_frag(pkt))) - { - send_packet(mars_io, frag, thr_idx); - } - packet_free(pkt); + mbuff = (marsio_buff_t *)origin->ctx; + copy_metadata_to_mbuff(pkt, mbuff); + marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); + packet_pool_push(mars_io->pool[thr_idx], pkt); } else { - send_packet(mars_io, pkt, thr_idx); + if (marsio_buff_malloc_global(mars_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) + { + PACKET_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); + packet_free(pkt); + } + else + { + stat->pkts_injected++; + stat->bytes_injected += len; + + char *ptr = marsio_buff_append(mbuff, len); + memcpy(ptr, packet_get_raw_data(pkt), len); + copy_metadata_to_mbuff(pkt, mbuff); + marsio_send_burst_with_options(mars_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); + packet_free(pkt); + } } + + stat->pkts_tx++; + stat->bytes_tx += len; + pkts[i] = NULL; } } void mars_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { + int len = 0; struct packet *pkt = NULL; - struct packet *frag = NULL; + marsio_buff_t *mbuff = NULL; + struct packet_origin *origin = NULL; struct mars_io *mars_io = (struct mars_io *)handle; + struct packet_io_stat *stat = &mars_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; - if (packet_is_defraged(pkt)) + len = packet_get_raw_len(pkt); + origin = packet_get_origin(pkt); + + stat->pkts_dropped++; + stat->bytes_dropped += len; + + if (origin->type == ORIGIN_TYPE_MR) { - while ((frag = packet_pop_frag(pkt))) - { - drop_packet(mars_io, frag, thr_idx); - } - packet_free(pkt); + mbuff = (marsio_buff_t *)origin->ctx; + marsio_buff_free(mars_io->mr_ins, &mbuff, 1, 0, thr_idx); + packet_pool_push(mars_io->pool[thr_idx], pkt); } else { - drop_packet(mars_io, pkt, thr_idx); + packet_free(pkt); } + pkts[i] = NULL; } } @@ -592,31 +518,8 @@ void mars_io_yield(void *handle, uint16_t thr_idx) marsio_poll_wait(mars_io->mr_ins, vdevs, 1, thr_idx, mars_io->cfg->idle_yield_ms); } -void mars_io_polling(void *handle, uint16_t thr_idx) -{ - struct mars_io *mars_io = (struct mars_io *)handle; - struct ip_reassembly *ip_reass = mars_io->ip_reass[thr_idx]; - struct packet *pkt = NULL; - uint64_t now_ms = clock_get_real_time_ms(); - - while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) - { - if (mars_io->cfg->fail_action == 0) - { - send_packet(mars_io, pkt, thr_idx); - } - else - { - drop_packet(mars_io, pkt, thr_idx); - } - } - - // TODO - // output stat -} - struct packet_io_stat *mars_io_stat(void *handle, uint16_t thr_idx) { struct mars_io *mars_io = (struct mars_io *)handle; return &mars_io->stat[thr_idx]; -} +} \ No newline at end of file diff --git a/infra/packet_io/mars_io.h b/infra/packet_io/mars_io.h index 66e41ee..e6ca493 100644 --- a/infra/packet_io/mars_io.h +++ b/infra/packet_io/mars_io.h @@ -9,7 +9,7 @@ extern "C" void *mars_io_new(const char *toml_file); void mars_io_free(void *handle); -int mars_io_isbreak(void *handle); +int mars_io_is_done(void *handle); int mars_io_init(void *handle, uint16_t thr_idx); int mars_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index 9210927..902cea5 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -1,6 +1,23 @@ #include "pcap_io.h" #include "mars_io.h" +#include "ip_reassembly.h" #include "utils_internal.h" +#include "packet_internal.h" +#include "fieldstat/fieldstat_easy.h" + +#define SYNC_STAT_INTERVAL_MS 1000 + +struct packet_io_cfg +{ + char mode[64]; + uint64_t thread_num; // range [1, MAX_THREAD_NUM] + + // ip reassembly + uint64_t fail_action; // 0: bypass, 1: drop + uint64_t timeout_ms; // range: [1, 60000] (ms) + uint64_t frag_queue_num; // range: [1, 4294967295 + uint64_t frag_queue_size; // range: [2, 65535] +}; struct packet_io { @@ -8,80 +25,199 @@ struct packet_io void *(*new_func)(const char *toml_file); void (*free_func)(void *handle); - int (*isbreak_func)(void *handle); + int (*done_func)(void *handle); int (*init_func)(void *handle, uint16_t thr_idx); int (*recv_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*send_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*yield_func)(void *handle, uint16_t thr_idx); - void (*polling_func)(void *handle, uint16_t thr_idx); struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); + + struct packet_io_cfg *cfg; + struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; + struct fieldstat_easy *fs; + int pkt_io_fs_idx[PKT_IO_STAT_MAX]; + int ip_reass_fs_idx[IP_REASS_STAT_MAX]; }; -struct packet_io *packet_io_new(const char *toml_file) +/****************************************************************************** + * packet io cfg + ******************************************************************************/ + +static void packet_io_cfg_free(struct packet_io_cfg *cfg) { - char mode[64] = {0}; - struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); - if (pkt_io == NULL) + if (cfg) + { + free(cfg); + cfg = NULL; + } +} + +static struct packet_io_cfg *packet_io_cfg_new(const char *toml_file) +{ + struct packet_io_cfg *cfg = (struct packet_io_cfg *)calloc(1, sizeof(struct packet_io_cfg)); + if (cfg == NULL) { return NULL; } - load_toml_str_config(toml_file, "packet_io.mode", mode); - if (strcmp(mode, "marsio") == 0) + int ret = 0; + ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); + ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); + ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); + ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); + ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); + + if (strcmp(cfg->mode, "marsio") != 0 && + strcmp(cfg->mode, "pcapfile") != 0 && + strcmp(cfg->mode, "pcaplist") != 0) + { + PACKET_IO_LOG_ERROR("packet_io.mode invalid: %s", cfg->mode); + free(cfg); + return NULL; + } + + if (ret != 0) + { + free(cfg); + return NULL; + } + + return cfg; +} + +static void packet_io_cfg_print(const struct packet_io_cfg *cfg) +{ + if (cfg) + { + PACKET_IO_LOG_INFO("packet_io.mode : %s", cfg->mode); + PACKET_IO_LOG_INFO("packet_io.thread_num : %lu", cfg->thread_num); + PACKET_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); + PACKET_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); + PACKET_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); + PACKET_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); + } +} + +/****************************************************************************** + * packet io + ******************************************************************************/ + +struct packet_io *packet_io_new(const char *toml_file) +{ + struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); + if (pkt_io == NULL) + { + PACKET_IO_LOG_ERROR("failed to allocate memory for packet_io"); + return NULL; + } + + pkt_io->cfg = packet_io_cfg_new(toml_file); + if (pkt_io->cfg == NULL) + { + PACKET_IO_LOG_ERROR("failed to create packet_io_cfg"); + goto error_out; + } + packet_io_cfg_print(pkt_io->cfg); + + if (strcmp(pkt_io->cfg->mode, "marsio") == 0) { pkt_io->new_func = mars_io_new; pkt_io->free_func = mars_io_free; - pkt_io->isbreak_func = mars_io_isbreak; + pkt_io->done_func = mars_io_is_done; pkt_io->init_func = mars_io_init; pkt_io->recv_func = mars_io_recv; pkt_io->send_func = mars_io_send; pkt_io->drop_func = mars_io_drop; pkt_io->yield_func = mars_io_yield; - pkt_io->polling_func = mars_io_polling; pkt_io->stat_func = mars_io_stat; } else { pkt_io->new_func = pcap_io_new; pkt_io->free_func = pcap_io_free; - pkt_io->isbreak_func = pcap_io_isbreak; + pkt_io->done_func = pcap_io_is_done; pkt_io->init_func = pcap_io_init; pkt_io->recv_func = pcap_io_recv; pkt_io->send_func = pcap_io_send; pkt_io->drop_func = pcap_io_drop; pkt_io->yield_func = pcap_io_yield; - pkt_io->polling_func = pcap_io_polling; pkt_io->stat_func = pcap_io_stat; } + for (uint64_t i = 0; i < pkt_io->cfg->thread_num; i++) + { + pkt_io->ip_reass[i] = ip_reassembly_new(pkt_io->cfg->timeout_ms, pkt_io->cfg->frag_queue_num, pkt_io->cfg->frag_queue_size); + if (pkt_io->ip_reass[i] == NULL) + { + PACKET_IO_LOG_ERROR("failed to create ip_reassembly"); + goto error_out; + } + } + + pkt_io->fs = fieldstat_easy_new(pkt_io->cfg->thread_num, "packet_io", NULL, 0); + if (pkt_io->fs == NULL) + { + PACKET_IO_LOG_ERROR("failed to create fieldstat_easy"); + goto error_out; + } + if (fieldstat_easy_enable_auto_output(pkt_io->fs, "packet_io.fs4", 2) != 0) + { + PACKET_IO_LOG_ERROR("failed to enable auto output for fieldstat_easy"); + goto error_out; + } + for (int i = 0; i < PKT_IO_STAT_MAX; i++) + { + pkt_io->pkt_io_fs_idx[i] = fieldstat_easy_register_counter(pkt_io->fs, pkt_io_stat_str[i]); + } + for (int i = 0; i < IP_REASS_STAT_MAX; i++) + { + pkt_io->ip_reass_fs_idx[i] = fieldstat_easy_register_counter(pkt_io->fs, ip_reass_stat_str[i]); + } + pkt_io->handle = pkt_io->new_func(toml_file); if (pkt_io->handle == NULL) { - packet_io_free(pkt_io); - return NULL; + PACKET_IO_LOG_ERROR("failed to create packet_io handle"); + goto error_out; } return pkt_io; + +error_out: + packet_io_free(pkt_io); + return NULL; } void packet_io_free(struct packet_io *pkt_io) { if (pkt_io) { - if (pkt_io->handle) + if (pkt_io->cfg) { - pkt_io->free_func(pkt_io->handle); + for (uint64_t i = 0; i < pkt_io->cfg->thread_num; i++) + { + ip_reassembly_free(pkt_io->ip_reass[i]); + } } + + pkt_io->free_func(pkt_io->handle); + if (pkt_io->fs) + { + fieldstat_easy_free(pkt_io->fs); + } + packet_io_cfg_free(pkt_io->cfg); + free(pkt_io); pkt_io = NULL; } } -int packet_io_isbreak(struct packet_io *pkt_io) +int packet_io_is_done(struct packet_io *pkt_io) { - return pkt_io->isbreak_func(pkt_io->handle); + return pkt_io->done_func(pkt_io->handle); } int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx) @@ -91,17 +227,82 @@ int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx) int packet_io_recv(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { - return pkt_io->recv_func(pkt_io->handle, thr_idx, pkts, nr_pkts); + struct packet *pkt = NULL; + struct packet *defrag = NULL; + struct ip_reassembly *ip_reass = pkt_io->ip_reass[thr_idx]; + uint64_t now_ms = clock_get_real_time_ms(); + + int nr_ret = 0; + int nr_recv = pkt_io->recv_func(pkt_io->handle, thr_idx, pkts, nr_pkts); + + for (int i = 0; i < nr_recv; i++) + { + pkt = pkts[i]; + if (packet_is_fragment(pkt)) + { + defrag = ip_reassembly_defrag(ip_reass, pkt, now_ms); + if (defrag) + { + pkts[nr_ret++] = defrag; + } + } + else + { + pkts[nr_ret++] = pkt; + continue; + } + } + + return nr_ret; } void packet_io_send(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { - pkt_io->send_func(pkt_io->handle, thr_idx, pkts, nr_pkts); + struct packet *frag = NULL; + struct packet *pkt = NULL; + + for (int i = 0; i < nr_pkts; i++) + { + pkt = pkts[i]; + if (packet_is_defraged(pkt)) + { + while ((frag = packet_pop_frag(pkt))) + { + // TODO check len vs MTU, if len > MTU, fragment it + pkt_io->send_func(pkt_io->handle, thr_idx, &frag, 1); + } + packet_free(pkt); + } + else + { + pkt_io->send_func(pkt_io->handle, thr_idx, &pkt, 1); + } + pkts[i] = NULL; + } } void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { - pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts); + struct packet *frag = NULL; + struct packet *pkt = NULL; + + for (int i = 0; i < nr_pkts; i++) + { + pkt = pkts[i]; + if (packet_is_defraged(pkt)) + { + while ((frag = packet_pop_frag(pkt))) + { + pkt_io->drop_func(pkt_io->handle, thr_idx, &frag, 1); + } + packet_free(pkt); + } + else + { + pkt_io->drop_func(pkt_io->handle, thr_idx, &pkt, 1); + } + pkts[i] = NULL; + } } void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) @@ -109,12 +310,56 @@ void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) pkt_io->yield_func(pkt_io->handle, thr_idx); } -void packet_io_polling(struct packet_io *pkt_io, uint16_t thr_idx) +void packet_io_clean(struct packet_io *pkt_io, uint16_t thr_idx) { - pkt_io->polling_func(pkt_io->handle, thr_idx); + struct packet *pkt = NULL; + uint64_t now_ms = clock_get_real_time_ms(); + struct ip_reassembly *ip_reass = pkt_io->ip_reass[thr_idx]; + + while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) + { + if (pkt_io->cfg->fail_action == 0) + { + packet_io_send(pkt_io, thr_idx, &pkt, 1); + } + else + { + packet_io_drop(pkt_io, thr_idx, &pkt, 1); + } + } + + static __thread uint64_t last_sync_stat_ms = 0; + static __thread struct packet_io_stat pkt_io_last_stat = {0}; + static __thread struct ip_reassembly_stat ip_reass_last_stat = {0}; + if (now_ms - last_sync_stat_ms >= SYNC_STAT_INTERVAL_MS) + { + struct packet_io_stat *pkt_io_curr_stat = pkt_io->stat_func(pkt_io->handle, thr_idx); + struct ip_reassembly_stat *ip_reass_curr_stat = ip_reassembly_get_stat(pkt_io->ip_reass[thr_idx]); + + for (int i = 0; i < PKT_IO_STAT_MAX; i++) + { + uint64_t val = packet_io_stat_get(pkt_io_curr_stat, i) - packet_io_stat_get(&pkt_io_last_stat, i); + fieldstat_easy_counter_incrby(pkt_io->fs, thr_idx, pkt_io->pkt_io_fs_idx[i], NULL, 0, val); + } + for (int i = 0; i < IP_REASS_STAT_MAX; i++) + { + uint64_t val = ip_reassembly_stat_get(ip_reass_curr_stat, i) - ip_reassembly_stat_get(&ip_reass_last_stat, i); + fieldstat_easy_counter_incrby(pkt_io->fs, thr_idx, pkt_io->ip_reass_fs_idx[i], NULL, 0, val); + } + pkt_io_last_stat = *pkt_io_curr_stat; + ip_reass_last_stat = *ip_reass_curr_stat; + last_sync_stat_ms = now_ms; + } } -struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx) +uint64_t packet_io_stat_get(struct packet_io_stat *stat, enum pkt_io_stat_type type) { - return pkt_io->stat_func(pkt_io->handle, thr_idx); -} + switch (type) + { +#define XX(_type, _name) case _type: return stat->_name; + PKT_IO_STAT_MAP(XX) +#undef XX + default: + return 0; + } +} \ No newline at end of file diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h index 4d9d457..3a508d5 100644 --- a/infra/packet_io/packet_io.h +++ b/infra/packet_io/packet_io.h @@ -7,8 +7,13 @@ extern "C" #include +#include "log_internal.h" #include "stellar/packet.h" +#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet IO", format, ##__VA_ARGS__) +#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet IO", format, ##__VA_ARGS__) +#define PACKET_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "packet IO", format, ##__VA_ARGS__) + struct packet_io_stat { // device packet @@ -35,18 +40,49 @@ struct packet_io_stat uint64_t bytes_user_freed; } __attribute__((aligned(64))); +#define PKT_IO_STAT_MAP(XX) \ + XX(PKT_IO_STAT_PKTS_RX, pkts_rx) \ + XX(PKT_IO_STAT_BYTES_RX, bytes_rx) \ + XX(PKT_IO_STAT_PKTS_TX, pkts_tx) \ + XX(PKT_IO_STAT_BYTES_TX, bytes_tx) \ + XX(PKT_IO_STAT_KEEP_ALIVE_PKTS, keep_alive_pkts) \ + XX(PKT_IO_STAT_KEEP_ALIVE_BYTES, keep_alive_bytes) \ + XX(PKT_IO_STAT_PKTS_DROPPED, pkts_dropped) \ + XX(PKT_IO_STAT_BYTES_DROPPED, bytes_dropped) \ + XX(PKT_IO_STAT_PKTS_INJECTED, pkts_injected) \ + XX(PKT_IO_STAT_BYTES_INJECTED, bytes_injected) \ + XX(PKT_IO_STAT_PKTS_USER_FREED, pkts_user_freed) \ + XX(PKT_IO_STAT_BYTES_USER_FREED, bytes_user_freed) + +enum pkt_io_stat_type +{ +#define XX(type, name) type, + PKT_IO_STAT_MAP(XX) +#undef XX + PKT_IO_STAT_MAX +}; + +__attribute__((unused)) static const char pkt_io_stat_str[PKT_IO_STAT_MAX][64] = +{ +#define XX(type, name) #name, + PKT_IO_STAT_MAP(XX) +#undef XX +}; + struct packet_io; struct packet_io *packet_io_new(const char *toml_file); void packet_io_free(struct packet_io *pkt_io); -int packet_io_isbreak(struct packet_io *pkt_io); +// only used in pcap mode, to check if all pcap has been processed +int packet_io_is_done(struct packet_io *pkt_io); int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx); int packet_io_recv(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void packet_io_send(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx); -void packet_io_polling(struct packet_io *pkt_io, uint16_t thr_idx); -struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx); +void packet_io_clean(struct packet_io *pkt_io, uint16_t thr_idx); + +uint64_t packet_io_stat_get(struct packet_io_stat *stat, enum pkt_io_stat_type type); #ifdef __cplusplus } diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c index 3a3cf4e..8242a18 100644 --- a/infra/packet_io/pcap_io.c +++ b/infra/packet_io/pcap_io.c @@ -8,15 +8,9 @@ #include "packet_dump.h" #include "packet_pool.h" #include "packet_parser.h" -#include "ip_reassembly.h" -#include "log_internal.h" #include "packet_internal.h" #include "utils_internal.h" -#define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) -#define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) -#define PCAP_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) - #define RING_BUFFER_MAX_SIZE (4096 * 1000) struct pcap_pkt @@ -30,16 +24,11 @@ struct pcap_io_cfg { char mode[16]; // pcapfile, pcaplist char pcap_path[PATH_MAX]; - uint16_t thread_num; // range [1, MAX_THREAD_NUM] + uint64_t pcap_done_exit; // range [0, 1] + uint64_t thread_num; // range [1, MAX_THREAD_NUM] // packet pool uint64_t capacity; // range: [1, 4294967295] - - // ip reassembly - uint64_t fail_action; // 0: bypass, 1: drop - uint64_t timeout_ms; // range: [1, 60000] (ms) - uint64_t frag_queue_num; // range: [1, 4294967295 - uint64_t frag_queue_size; // range: [2, 65535] }; struct pcap_io @@ -51,7 +40,6 @@ struct pcap_io struct ring_buffer *ring[MAX_THREAD_NUM]; struct packet_pool *pool[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; - struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; @@ -78,14 +66,14 @@ static struct ring_buffer *ring_buffer_new(uint32_t size) struct ring_buffer *ring = (struct ring_buffer *)calloc(1, sizeof(struct ring_buffer)); if (ring == NULL) { - PCAP_IO_LOG_ERROR("unable to new ring buffer"); + PACKET_IO_LOG_ERROR("unable to new ring buffer"); return NULL; } ring->buff = (uint64_t *)calloc(size, sizeof(uint64_t)); if (ring->buff == NULL) { - PCAP_IO_LOG_ERROR("unable to new ring buffer"); + PACKET_IO_LOG_ERROR("unable to new ring buffer"); free(ring); return NULL; } @@ -115,7 +103,7 @@ static int ring_buffer_push(struct ring_buffer *ring, void *data) { if (__sync_val_compare_and_swap(&ring->buff[ring->tail], 0, data) != 0) { - PCAP_IO_LOG_ERROR("ring buffer is full, retry later"); + PACKET_IO_LOG_ERROR("ring buffer is full, retry later"); return -1; } @@ -151,15 +139,12 @@ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file) int ret = 0; ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path); - ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); + ret += load_toml_integer_config(toml_file, "packet_io.pcap_done_exit", &cfg->pcap_done_exit, 0, 1); + ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); - ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0) { - PCAP_IO_LOG_ERROR("config file invalid packet_io.mode %s", cfg->mode); + PACKET_IO_LOG_ERROR("config file invalid packet_io.mode %s", cfg->mode); ret = -1; } @@ -187,14 +172,9 @@ static void pcap_io_cfg_print(const struct pcap_io_cfg *cfg) { if (cfg) { - PCAP_IO_LOG_INFO("packet_io.mode : %s", cfg->mode); - PCAP_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path); - PCAP_IO_LOG_INFO("packet_io.thread_num : %ld", cfg->thread_num); - PCAP_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); - PCAP_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); - PCAP_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); - PCAP_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); - PCAP_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); + PACKET_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path); + PACKET_IO_LOG_INFO("packet_io.pcap_done_exit : %lu", cfg->pcap_done_exit); + PACKET_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); } } @@ -206,30 +186,30 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_ { struct pcap_io *pcap_io = (struct pcap_io *)user; - struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); - if (pcap_pkt == NULL) + struct pcap_pkt *pcap = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); + if (pcap == NULL) { - PCAP_IO_LOG_ERROR("unable to alloc packet"); + PACKET_IO_LOG_ERROR("unable to alloc packet"); return; } - pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); - pcap_pkt->len = h->caplen; - pcap_pkt->ts = h->ts; - memcpy((char *)pcap_pkt->data, bytes, h->caplen); + pcap->data = (char *)pcap + sizeof(struct pcap_pkt); + pcap->len = h->caplen; + pcap->ts = h->ts; + memcpy((char *)pcap->data, bytes, h->caplen); ATOMIC_INC(&pcap_io->read_pcap_pkts); struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); - packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); + packet_parse(&pkt, pcap->data, pcap->len); uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); struct ring_buffer *ring = pcap_io->ring[hash % pcap_io->cfg->thread_num]; - while (ring_buffer_push(ring, pcap_pkt) == -1) + while (ring_buffer_push(ring, pcap) == -1) { if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { - free(pcap_pkt); - PCAP_IO_LOG_FATAL("pcap io thread need exit"); + free(pcap); + PACKET_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); break; } @@ -238,7 +218,7 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_ if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { - PCAP_IO_LOG_FATAL("pcap io thread need exit"); + PACKET_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); } } @@ -249,19 +229,19 @@ static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file) char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); - PCAP_IO_LOG_FATAL("pcap %s in-processing", resolved_path) + PACKET_IO_LOG_FATAL("pcap %s in-processing", resolved_path) pcap_io->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (pcap_io->pcap == NULL) { - PCAP_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); + PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } pcap_io->read_pcap_files++; pcap_loop(pcap_io->pcap, -1, pcap_pkt_handler, (u_char *)pcap_io); pcap_close(pcap_io->pcap); - PCAP_IO_LOG_FATAL("pcap %s processed", resolved_path) + PACKET_IO_LOG_FATAL("pcap %s processed", resolved_path) return 0; } @@ -290,7 +270,7 @@ static void *pcap_io_thread(void *arg) __thread_local_logger = pcap_io->logger; ATOMIC_SET(&pcap_io->io_thread_is_runing, 1); - PCAP_IO_LOG_FATAL("pcap io thread is running"); + PACKET_IO_LOG_FATAL("pcap io thread is running"); if (strcmp(pcap_io->cfg->mode, "pcapfile") == 0) { @@ -301,7 +281,7 @@ static void *pcap_io_thread(void *arg) FILE *fp = NULL; if (strcmp(pcap_io->cfg->pcap_path, "-") == 0) { - PCAP_IO_LOG_ERROR("pcap path is empty, read from stdin"); + PACKET_IO_LOG_ERROR("pcap path is empty, read from stdin"); fp = stdin; } else @@ -309,7 +289,7 @@ static void *pcap_io_thread(void *arg) fp = fopen(pcap_io->cfg->pcap_path, "r"); if (fp == NULL) { - PCAP_IO_LOG_ERROR("unable to open pcap path: %s", pcap_io->cfg->pcap_path); + PACKET_IO_LOG_ERROR("unable to open pcap path: %s", pcap_io->cfg->pcap_path); goto erro_out; } } @@ -356,128 +336,17 @@ static void origin_free_cb(struct packet *pkt, void *args) { struct pcap_io *pcap_io = (struct pcap_io *)args; struct packet_origin *origin = packet_get_origin(pkt); - struct pcap_pkt *pcap_pkt = origin->ctx; + struct pcap_pkt *pcap = origin->ctx; struct packet_io_stat *stat = &pcap_io->stat[origin->thr_idx]; struct packet_pool *pool = pcap_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); - free(pcap_pkt); + free(pcap); packet_pool_push(pool, pkt); } -static struct packet *recv_packet(struct pcap_io *pcap_io, struct pcap_pkt *pcap_pkt, uint16_t thr_idx) -{ - struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; - struct ip_reassembly *ip_reass = pcap_io->ip_reass[thr_idx]; - struct packet_pool *pool = pcap_io->pool[thr_idx]; - - if (pcap_pkt == NULL) - { - return NULL; - } - - stat->pkts_rx++; - stat->bytes_rx += pcap_pkt->len; - - struct packet *pkt = packet_pool_pop(pool); - assert(pkt != NULL); - struct packet_origin origin = { - .type = ORIGIN_TYPE_PCAP, - .ctx = pcap_pkt, - .cb = origin_free_cb, - .args = pcap_io, - .thr_idx = thr_idx, - }; - packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); - memset(&pkt->meta, 0, sizeof(pkt->meta)); - packet_set_action(pkt, PACKET_ACTION_FORWARD); - packet_set_timeval(pkt, &pcap_pkt->ts); - packet_set_origin(pkt, &origin); - - if (packet_is_fragment(pkt)) - { - return ip_reassembly_defrag(ip_reass, pkt, clock_get_real_time_ms()); - } - else - { - return pkt; - } -} - -static void send_packet(struct pcap_io *pcap_io, struct packet *pkt, uint16_t thr_idx) -{ - struct pcap_pkt *pcap_pkt = NULL; - struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; - int len = packet_get_raw_len(pkt); - struct packet_origin *origin = packet_get_origin(pkt); - - if (origin->type == ORIGIN_TYPE_PCAP) - { - pcap_pkt = (struct pcap_pkt *)origin->ctx; - free(pcap_pkt); - packet_pool_push(pcap_io->pool[thr_idx], pkt); - } - else - { - stat->pkts_injected++; - stat->bytes_injected += len; - - struct tuple6 tuple; - char file[PATH_MAX] = {0}; - char src_addr[INET6_ADDRSTRLEN] = {0}; - char dst_addr[INET6_ADDRSTRLEN] = {0}; - - memset(&tuple, 0, sizeof(struct tuple6)); - packet_get_innermost_tuple6(pkt, &tuple); - if (tuple.addr_family == AF_INET) - { - inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); - inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); - } - else - { - inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); - inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); - } - snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); - if (packet_dump_pcap(pkt, file) == -1) - { - PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); - } - else - { - PCAP_IO_LOG_FATAL("dump inject packet: %s", file); - } - packet_free(pkt); - } - - stat->pkts_tx++; - stat->bytes_tx += len; -} - -static void drop_packet(struct pcap_io *pcap_io, struct packet *pkt, uint16_t thr_idx) -{ - struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; - int len = packet_get_raw_len(pkt); - struct packet_origin *origin = packet_get_origin(pkt); - - stat->pkts_dropped++; - stat->bytes_dropped += len; - - if (origin->type == ORIGIN_TYPE_PCAP) - { - struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)origin->ctx; - free(pcap_pkt); - packet_pool_push(pcap_io->pool[thr_idx], pkt); - } - else - { - packet_free(pkt); - } -} - /****************************************************************************** * Public API ******************************************************************************/ @@ -488,44 +357,39 @@ void *pcap_io_new(const char *toml_file) struct pcap_io *pcap_io = (struct pcap_io *)calloc(1, sizeof(struct pcap_io)); if (pcap_io == NULL) { - PCAP_IO_LOG_ERROR("unable to allocate memory for pcap_io"); + PACKET_IO_LOG_ERROR("unable to allocate memory for pcap_io"); return NULL; } pcap_io->cfg = pcap_io_cfg_new(toml_file); if (pcap_io->cfg == NULL) { - PCAP_IO_LOG_ERROR("unable to create pcap_io_cfg"); + PACKET_IO_LOG_ERROR("unable to create pcap_io_cfg"); goto error_out; } pcap_io_cfg_print(pcap_io->cfg); pcap_io->logger = __thread_local_logger; + for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE); if (pcap_io->ring[i] == NULL) { - PCAP_IO_LOG_ERROR("unable to create ring buffer"); + PACKET_IO_LOG_ERROR("unable to create ring buffer"); goto error_out; } pcap_io->pool[i] = packet_pool_new(pcap_io->cfg->capacity); if (pcap_io->pool[i] == NULL) { - PCAP_IO_LOG_ERROR("unable to create packet pool"); - goto error_out; - } - pcap_io->ip_reass[i] = ip_reassembly_new(pcap_io->cfg->timeout_ms, pcap_io->cfg->frag_queue_num, pcap_io->cfg->frag_queue_size); - if (pcap_io->ip_reass[i] == NULL) - { - PCAP_IO_LOG_ERROR("unable to create ip reassembly"); + PACKET_IO_LOG_ERROR("unable to create packet pool"); goto error_out; } } if (pthread_create(&tid, NULL, pcap_io_thread, (void *)pcap_io) != 0) { - PCAP_IO_LOG_ERROR("unable to create pcap io thread"); + PACKET_IO_LOG_ERROR("unable to create pcap io thread"); goto error_out; } @@ -548,22 +412,21 @@ void pcap_io_free(void *handle) usleep(1000); } - struct pcap_pkt *pcap_pkt = NULL; + struct pcap_pkt *pcap = NULL; for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { while (1) { - ring_buffer_pop(pcap_io->ring[i], (void **)&pcap_pkt); - if (pcap_pkt) + ring_buffer_pop(pcap_io->ring[i], (void **)&pcap); + if (pcap) { - free(pcap_pkt); + free(pcap); } else { break; } } - ip_reassembly_free(pcap_io->ip_reass[i]); packet_pool_free(pcap_io->pool[i]); ring_buffer_free(pcap_io->ring[i]); } @@ -573,11 +436,18 @@ void pcap_io_free(void *handle) } } -int pcap_io_isbreak(void *handle) +int pcap_io_is_done(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; - return ATOMIC_READ(&pcap_io->io_thread_wait_exit); + if (pcap_io->cfg->pcap_done_exit) + { + return ATOMIC_READ(&pcap_io->io_thread_wait_exit); + } + else + { + return 0; + } } int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) @@ -588,19 +458,40 @@ int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attrib int pcap_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; - struct pcap_pkt *pcap_pkt = NULL; + struct pcap_pkt *pcap = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct ring_buffer *ring = pcap_io->ring[thr_idx]; + struct packet_pool *pool = pcap_io->pool[thr_idx]; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; int ret = 0; for (int i = 0; i < nr_pkts; i++) { - ring_buffer_pop(ring, (void **)&pcap_pkt); - pkt = recv_packet(pcap_io, pcap_pkt, thr_idx); - if (pkt) + ring_buffer_pop(ring, (void **)&pcap); + if (pcap == NULL) { - pkts[ret++] = pkt; + break; } + + stat->pkts_rx++; + stat->bytes_rx += pcap->len; + + pkt = packet_pool_pop(pool); + assert(pkt != NULL); + struct packet_origin origin = { + .type = ORIGIN_TYPE_PCAP, + .ctx = pcap, + .cb = origin_free_cb, + .args = pcap_io, + .thr_idx = thr_idx, + }; + packet_parse(pkt, pcap->data, pcap->len); + memset(&pkt->meta, 0, sizeof(pkt->meta)); + packet_set_action(pkt, PACKET_ACTION_FORWARD); + packet_set_timeval(pkt, &pcap->ts); + packet_set_origin(pkt, &origin); + + pkts[ret++] = pkt; } return ret; @@ -608,51 +499,94 @@ int pcap_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_p void pcap_io_send(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { - struct packet *frag = NULL; + int len = 0; + struct tuple6 tuple; + char file[PATH_MAX] = {0}; struct packet *pkt = NULL; + struct pcap_pkt *pcap = NULL; + struct packet_origin *origin = NULL; + char src_addr_str[INET6_ADDRSTRLEN] = {0}; + char dst_addr_str[INET6_ADDRSTRLEN] = {0}; struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; - if (packet_is_defraged(pkt)) + len = packet_get_raw_len(pkt); + origin = packet_get_origin(pkt); + + stat->pkts_tx++; + stat->bytes_tx += len; + + if (origin->type == ORIGIN_TYPE_PCAP) { - while ((frag = packet_pop_frag(pkt))) - { - send_packet(pcap_io, frag, thr_idx); - } - packet_free(pkt); + pcap = (struct pcap_pkt *)origin->ctx; + free(pcap); + packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { - send_packet(pcap_io, pkt, thr_idx); + stat->pkts_injected++; + stat->bytes_injected += len; + + memset(&tuple, 0, sizeof(struct tuple6)); + packet_get_innermost_tuple6(pkt, &tuple); + if (tuple.addr_family == AF_INET) + { + inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr_str, INET6_ADDRSTRLEN); + inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr_str, INET6_ADDRSTRLEN); + } + else + { + inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr_str, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr_str, INET6_ADDRSTRLEN); + } + snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr_str, ntohs(tuple.src_port), dst_addr_str, ntohs(tuple.dst_port), stat->pkts_injected); + if (packet_dump_pcap(pkt, file) == -1) + { + PACKET_IO_LOG_ERROR("unable to dump pcap file: %s", file); + } + else + { + PACKET_IO_LOG_FATAL("dump inject packet: %s", file); + } + packet_free(pkt); } + pkts[i] = NULL; } } void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { + int len = 0; struct packet *pkt = NULL; - struct packet *frag = NULL; + struct pcap_pkt *pcap = NULL; + struct packet_origin *origin = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; + struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; - if (packet_is_defraged(pkt)) + len = packet_get_raw_len(pkt); + origin = packet_get_origin(pkt); + + stat->pkts_dropped++; + stat->bytes_dropped += len; + + if (origin->type == ORIGIN_TYPE_PCAP) { - while ((frag = packet_pop_frag(pkt))) - { - drop_packet(pcap_io, frag, thr_idx); - } - packet_free(pkt); + pcap = (struct pcap_pkt *)origin->ctx; + free(pcap); + packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { - drop_packet(pcap_io, pkt, thr_idx); + packet_free(pkt); } pkts[i] = NULL; } @@ -663,29 +597,6 @@ void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attr return; } -void pcap_io_polling(void *handle, uint16_t thr_idx) -{ - struct pcap_io *pcap_io = (struct pcap_io *)handle; - struct ip_reassembly *ip_reass = pcap_io->ip_reass[thr_idx]; - struct packet *pkt = NULL; - uint64_t now_ms = clock_get_real_time_ms(); - - while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) - { - if (pcap_io->cfg->fail_action == 0) - { - send_packet(pcap_io, pkt, thr_idx); - } - else - { - drop_packet(pcap_io, pkt, thr_idx); - } - } - - // TODO - // output stat -} - struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx) { struct pcap_io *pcap_io = (struct pcap_io *)handle; diff --git a/infra/packet_io/pcap_io.h b/infra/packet_io/pcap_io.h index 9556eda..a8dff0b 100644 --- a/infra/packet_io/pcap_io.h +++ b/infra/packet_io/pcap_io.h @@ -9,7 +9,7 @@ extern "C" void *pcap_io_new(const char *toml_file); void pcap_io_free(void *handle); -int pcap_io_isbreak(void *handle); +int pcap_io_is_done(void *handle); int pcap_io_init(void *handle, uint16_t thr_idx); int pcap_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); diff --git a/infra/packet_io/test/conf/pcap_io.toml b/infra/packet_io/test/conf/pcap_io.toml index 602f778..3bf7cfc 100644 --- a/infra/packet_io/test/conf/pcap_io.toml +++ b/infra/packet_io/test/conf/pcap_io.toml @@ -3,6 +3,7 @@ app_symbol = "stellar" dev_symbol = "nf_0_fw" pcap_path = "./pcap/IPv4_frags_UDP.pcap" + pcap_done_exit = 1 # range: [0, 1] thread_num = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] idle_yield_ms = 900 # range: [0, 60000] (ms) diff --git a/infra/packet_io/test/gtest_packet_io.cpp b/infra/packet_io/test/gtest_packet_io.cpp index 215c9b3..cdb6877 100644 --- a/infra/packet_io/test/gtest_packet_io.cpp +++ b/infra/packet_io/test/gtest_packet_io.cpp @@ -242,7 +242,7 @@ TEST(PCAP_IO, IP_REASSEMBLY) EXPECT_TRUE(memcmp(data, expect, sizeof(expect)) == 0); packet_io_drop(pkt_io, thr_idx, pkts, nr_pkts); - packet_io_polling(pkt_io, thr_idx); + packet_io_clean(pkt_io, thr_idx); packet_io_free(pkt_io); } diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 83f84c0..a8a3879 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -516,6 +516,16 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id) mq_runtime_dispatch(mq_rte); } +<<<<<<< HEAD +======= + // flush stat before free + for (int i = 0; i < SESS_MGR_STAT_MAX; i++) + { + uint64_t val = session_manager_stat_get(stat, i); + fieldstat_easy_counter_set(sess_mgr->fs, thread_id, sess_mgr->stat_idx[i], NULL, 0, val); + } + +>>>>>>> 29ffae8 (packet IO support output fieldstat) session_manager_rte_free(sess_mgr->rte[thread_id]); sess_mgr->rte[thread_id] = NULL; } diff --git a/infra/stellar_core.c b/infra/stellar_core.c index ab9a39f..16e4f4f 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -86,7 +86,7 @@ static void *worker_thread(void *arg) } stellar_polling_dispatch(mod_mgr); - packet_io_polling(pkt_io, thread_id); + packet_io_clean(pkt_io, thread_id); if (nr_recv == 0) { packet_io_yield(pkt_io, thread_id); @@ -211,7 +211,7 @@ void stellar_run(struct stellar *st) usleep(1000); // 1ms // only available in pcap mode - if (packet_io_isbreak(st->pkt_io)) + if (packet_io_is_done(st->pkt_io)) { ATOMIC_SET(&st->need_exit, 1); break;