#include "pcap_io.h" #include "mars_io.h" #include "ip_reassembly.h" #include "utils_internal.h" #include "packet_internal.h" #include "fieldstat/fieldstat_easy.h" struct packet_io_cfg { char mode[64]; uint64_t thread_num; // range [1, MAX_THREAD_NUM] // ip reassembly uint64_t fail_action; // 0: bypass, 1: drop uint64_t timeout_ms; // range: [1, 60000] (ms) uint64_t frag_queue_num; // range: [1, 4294967295 uint64_t frag_queue_size; // range: [2, 65535] }; struct packet_io { void *handle; void *(*new_func)(const char *toml_file); void (*free_func)(void *handle); int (*done_func)(void *handle); int (*init_func)(void *handle, uint16_t thr_idx); int (*recv_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*send_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts); void (*yield_func)(void *handle, uint16_t thr_idx); struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); struct packet_io_cfg *cfg; struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; struct fieldstat_easy *fs; int pkt_io_fs_idx[PKT_IO_STAT_MAX]; int ip_reass_fs_idx[IP_REASS_STAT_MAX]; }; /****************************************************************************** * packet io cfg ******************************************************************************/ static void packet_io_cfg_free(struct packet_io_cfg *cfg) { if (cfg) { free(cfg); cfg = NULL; } } static struct packet_io_cfg *packet_io_cfg_new(const char *toml_file) { struct packet_io_cfg *cfg = (struct packet_io_cfg *)calloc(1, sizeof(struct packet_io_cfg)); if (cfg == NULL) { return NULL; } int ret = 0; ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); if (strcmp(cfg->mode, "marsio") != 0 && strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0) { PACKET_IO_LOG_ERROR("packet_io.mode invalid: %s", cfg->mode); free(cfg); return NULL; } if (ret != 0) { free(cfg); return NULL; } return cfg; } static void packet_io_cfg_print(const struct packet_io_cfg *cfg) { if (cfg) { PACKET_IO_LOG_INFO("packet_io.mode : %s", cfg->mode); PACKET_IO_LOG_INFO("packet_io.thread_num : %lu", cfg->thread_num); PACKET_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); PACKET_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); PACKET_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); PACKET_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); } } /****************************************************************************** * packet io ******************************************************************************/ struct packet_io *packet_io_new(const char *toml_file) { struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io)); if (pkt_io == NULL) { PACKET_IO_LOG_ERROR("failed to allocate memory for packet_io"); return NULL; } pkt_io->cfg = packet_io_cfg_new(toml_file); if (pkt_io->cfg == NULL) { PACKET_IO_LOG_ERROR("failed to create packet_io_cfg"); goto error_out; } packet_io_cfg_print(pkt_io->cfg); if (strcmp(pkt_io->cfg->mode, "marsio") == 0) { pkt_io->new_func = mars_io_new; pkt_io->free_func = mars_io_free; pkt_io->done_func = mars_io_is_done; pkt_io->init_func = mars_io_init; pkt_io->recv_func = mars_io_recv; pkt_io->send_func = mars_io_send; pkt_io->drop_func = mars_io_drop; pkt_io->yield_func = mars_io_yield; pkt_io->stat_func = mars_io_stat; } else { pkt_io->new_func = pcap_io_new; pkt_io->free_func = pcap_io_free; pkt_io->done_func = pcap_io_is_done; pkt_io->init_func = pcap_io_init; pkt_io->recv_func = pcap_io_recv; pkt_io->send_func = pcap_io_send; pkt_io->drop_func = pcap_io_drop; pkt_io->yield_func = pcap_io_yield; pkt_io->stat_func = pcap_io_stat; } for (uint64_t i = 0; i < pkt_io->cfg->thread_num; i++) { pkt_io->ip_reass[i] = ip_reassembly_new(pkt_io->cfg->timeout_ms, pkt_io->cfg->frag_queue_num, pkt_io->cfg->frag_queue_size); if (pkt_io->ip_reass[i] == NULL) { PACKET_IO_LOG_ERROR("failed to create ip_reassembly"); goto error_out; } } pkt_io->fs = fieldstat_easy_new(pkt_io->cfg->thread_num, "packet_io", NULL, 0); if (pkt_io->fs == NULL) { PACKET_IO_LOG_ERROR("failed to create fieldstat_easy"); goto error_out; } if (fieldstat_easy_enable_auto_output(pkt_io->fs, "packet_io.fs4", 2) != 0) { PACKET_IO_LOG_ERROR("failed to enable auto output for fieldstat_easy"); goto error_out; } for (int i = 0; i < PKT_IO_STAT_MAX; i++) { pkt_io->pkt_io_fs_idx[i] = fieldstat_easy_register_counter(pkt_io->fs, pkt_io_stat_str[i]); } for (int i = 0; i < IP_REASS_STAT_MAX; i++) { pkt_io->ip_reass_fs_idx[i] = fieldstat_easy_register_counter(pkt_io->fs, ip_reass_stat_str[i]); } pkt_io->handle = pkt_io->new_func(toml_file); if (pkt_io->handle == NULL) { PACKET_IO_LOG_ERROR("failed to create packet_io handle"); goto error_out; } return pkt_io; error_out: packet_io_free(pkt_io); return NULL; } void packet_io_free(struct packet_io *pkt_io) { if (pkt_io) { if (pkt_io->cfg) { for (uint64_t i = 0; i < pkt_io->cfg->thread_num; i++) { ip_reassembly_free(pkt_io->ip_reass[i]); } } pkt_io->free_func(pkt_io->handle); if (pkt_io->fs) { fieldstat_easy_free(pkt_io->fs); } packet_io_cfg_free(pkt_io->cfg); free(pkt_io); pkt_io = NULL; } } int packet_io_is_done(struct packet_io *pkt_io) { return pkt_io->done_func(pkt_io->handle); } int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx) { return pkt_io->init_func(pkt_io->handle, thr_idx); } int packet_io_recv(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; struct packet *defrag = NULL; struct ip_reassembly *ip_reass = pkt_io->ip_reass[thr_idx]; uint64_t now_ms = clock_get_real_time_ms(); int nr_ret = 0; int nr_recv = pkt_io->recv_func(pkt_io->handle, thr_idx, pkts, nr_pkts); for (int i = 0; i < nr_recv; i++) { pkt = pkts[i]; if (packet_is_fragment(pkt)) { defrag = ip_reassembly_defrag(ip_reass, pkt, now_ms); if (defrag) { pkts[nr_ret++] = defrag; } } else { pkts[nr_ret++] = pkt; continue; } } return nr_ret; } void packet_io_send(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *frag = NULL; struct packet *pkt = NULL; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { // TODO check len vs MTU, if len > MTU, fragment it pkt_io->send_func(pkt_io->handle, thr_idx, &frag, 1); } packet_free(pkt); } else { pkt_io->send_func(pkt_io->handle, thr_idx, &pkt, 1); } pkts[i] = NULL; } } void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *frag = NULL; struct packet *pkt = NULL; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { pkt_io->drop_func(pkt_io->handle, thr_idx, &frag, 1); } packet_free(pkt); } else { pkt_io->drop_func(pkt_io->handle, thr_idx, &pkt, 1); } pkts[i] = NULL; } } void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) { pkt_io->yield_func(pkt_io->handle, thr_idx); } void packet_io_clean(struct packet_io *pkt_io, uint16_t thr_idx) { struct packet *pkt = NULL; uint64_t now_ms = clock_get_real_time_ms(); struct ip_reassembly *ip_reass = pkt_io->ip_reass[thr_idx]; while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) { if (pkt_io->cfg->fail_action == 0) { packet_io_send(pkt_io, thr_idx, &pkt, 1); } else { packet_io_drop(pkt_io, thr_idx, &pkt, 1); } } static __thread uint64_t last_sync_stat_ms = 0; static __thread struct packet_io_stat pkt_io_last_stat = {0}; static __thread struct ip_reassembly_stat ip_reass_last_stat = {0}; if (now_ms - last_sync_stat_ms >= SYNC_STAT_INTERVAL_MS) { struct packet_io_stat *pkt_io_curr_stat = pkt_io->stat_func(pkt_io->handle, thr_idx); struct ip_reassembly_stat *ip_reass_curr_stat = ip_reassembly_get_stat(pkt_io->ip_reass[thr_idx]); for (int i = 0; i < PKT_IO_STAT_MAX; i++) { uint64_t val = packet_io_stat_get(pkt_io_curr_stat, i) - packet_io_stat_get(&pkt_io_last_stat, i); fieldstat_easy_counter_incrby(pkt_io->fs, thr_idx, pkt_io->pkt_io_fs_idx[i], NULL, 0, val); } for (int i = 0; i < IP_REASS_STAT_MAX; i++) { uint64_t val = ip_reassembly_stat_get(ip_reass_curr_stat, i) - ip_reassembly_stat_get(&ip_reass_last_stat, i); fieldstat_easy_counter_incrby(pkt_io->fs, thr_idx, pkt_io->ip_reass_fs_idx[i], NULL, 0, val); } pkt_io_last_stat = *pkt_io_curr_stat; ip_reass_last_stat = *ip_reass_curr_stat; last_sync_stat_ms = now_ms; } } uint64_t packet_io_stat_get(struct packet_io_stat *stat, enum pkt_io_stat_type type) { switch (type) { #define XX(_type, _name) case _type: return stat->_name; PKT_IO_STAT_MAP(XX) #undef XX default: return 0; } }