#include #include #include #include #include #include "pcap_io.h" #include "packet_dump.h" #include "packet_pool.h" #include "packet_parser.h" #include "packet_internal.h" #include "utils_internal.h" #define RING_BUFFER_MAX_SIZE (4096 * 1000) struct pcap_pkt { char *data; int len; struct timeval ts; }; struct pcap_io_cfg { char mode[16]; // pcapfile, pcaplist char pcap_path[PATH_MAX]; uint64_t pcap_done_exit; // range [0, 1] uint64_t thread_num; // range [1, MAX_THREAD_NUM] // packet pool uint64_t capacity; // range: [1, 4294967295] }; struct pcap_io { struct pcap_io_cfg *cfg; pcap_t *pcap; struct logger *logger; struct ring_buffer *ring[MAX_THREAD_NUM]; struct packet_pool *pool[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; uint64_t read_pcap_files; uint64_t read_pcap_pkts; }; /****************************************************************************** * Private API -- ring ******************************************************************************/ struct ring_buffer { uint64_t *buff; uint32_t size; uint32_t head; uint32_t tail; }; static struct ring_buffer *ring_buffer_new(uint32_t size) { struct ring_buffer *ring = (struct ring_buffer *)calloc(1, sizeof(struct ring_buffer)); if (ring == NULL) { PACKET_IO_LOG_ERROR("unable to new ring buffer"); return NULL; } ring->buff = (uint64_t *)calloc(size, sizeof(uint64_t)); if (ring->buff == NULL) { PACKET_IO_LOG_ERROR("unable to new ring buffer"); free(ring); return NULL; } ring->size = size; ring->head = 0; ring->tail = 0; return ring; } static void ring_buffer_free(struct ring_buffer *ring) { if (ring) { if (ring->buff) { free(ring->buff); ring->buff = NULL; } free(ring); ring = NULL; } } static int ring_buffer_push(struct ring_buffer *ring, void *data) { if (__sync_val_compare_and_swap(&ring->buff[ring->tail], 0, data) != 0) { PACKET_IO_LOG_ERROR("ring buffer is full, retry later"); return -1; } ring->tail = (ring->tail + 1) % ring->size; return 0; } static void ring_buffer_pop(struct ring_buffer *ring, void **data) { uint64_t read = ATOMIC_READ(&ring->buff[ring->head]); if (read == 0) { *data = NULL; return; } __sync_val_compare_and_swap(&ring->buff[ring->head], read, 0); *data = (void *)read; ring->head = (ring->head + 1) % ring->size; } /****************************************************************************** * Private API -- config ******************************************************************************/ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file) { struct pcap_io_cfg *cfg = (struct pcap_io_cfg *)calloc(1, sizeof(struct pcap_io_cfg)); if (cfg == NULL) { return NULL; } int ret = 0; ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path); ret += load_toml_integer_config(toml_file, "packet_io.pcap_done_exit", &cfg->pcap_done_exit, 0, 1); ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0) { PACKET_IO_LOG_ERROR("config file invalid packet_io.mode %s", cfg->mode); ret = -1; } if (ret != 0) { free(cfg); return NULL; } else { return cfg; } } static void pcap_io_cfg_free(struct pcap_io_cfg *cfg) { if (cfg) { free(cfg); cfg = NULL; } } static void pcap_io_cfg_print(const struct pcap_io_cfg *cfg) { if (cfg) { PACKET_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path); PACKET_IO_LOG_INFO("packet_io.pcap_done_exit : %lu", cfg->pcap_done_exit); PACKET_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); } } /****************************************************************************** * Private API -- pcap ******************************************************************************/ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct pcap_io *pcap_io = (struct pcap_io *)user; struct pcap_pkt *pcap = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap == NULL) { PACKET_IO_LOG_ERROR("unable to alloc packet"); return; } pcap->data = (char *)pcap + sizeof(struct pcap_pkt); pcap->len = h->caplen; pcap->ts = h->ts; memcpy((char *)pcap->data, bytes, h->caplen); ATOMIC_INC(&pcap_io->read_pcap_pkts); struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); packet_parse(&pkt, pcap->data, pcap->len); uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); struct ring_buffer *ring = pcap_io->ring[hash % pcap_io->cfg->thread_num]; while (ring_buffer_push(ring, pcap) == -1) { if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { free(pcap); PACKET_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); break; } usleep(1000); } if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { PACKET_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); } } static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); PACKET_IO_LOG_FATAL("pcap %s in-processing", resolved_path) pcap_io->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (pcap_io->pcap == NULL) { PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } pcap_io->read_pcap_files++; pcap_loop(pcap_io->pcap, -1, pcap_pkt_handler, (u_char *)pcap_io); pcap_close(pcap_io->pcap); PACKET_IO_LOG_FATAL("pcap %s processed", resolved_path) return 0; } static int all_packet_consumed(struct pcap_io *pcap_io) { uint64_t consumed_pkts = 0; uint64_t read_pcap_pkts = ATOMIC_READ(&pcap_io->read_pcap_pkts); for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { consumed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_rx); } if (consumed_pkts == read_pcap_pkts) { return 1; } else { return 0; } } static void *pcap_io_thread(void *arg) { struct pcap_io *pcap_io = (struct pcap_io *)arg; __thread_local_logger = pcap_io->logger; ATOMIC_SET(&pcap_io->io_thread_is_runing, 1); PACKET_IO_LOG_FATAL("pcap io thread is running"); if (strcmp(pcap_io->cfg->mode, "pcapfile") == 0) { pcap_io_handler(pcap_io, pcap_io->cfg->pcap_path); } else { FILE *fp = NULL; if (strcmp(pcap_io->cfg->pcap_path, "-") == 0) { PACKET_IO_LOG_ERROR("pcap path is empty, read from stdin"); fp = stdin; } else { fp = fopen(pcap_io->cfg->pcap_path, "r"); if (fp == NULL) { PACKET_IO_LOG_ERROR("unable to open pcap path: %s", pcap_io->cfg->pcap_path); goto erro_out; } } char line[PATH_MAX]; while (ATOMIC_READ(&pcap_io->io_thread_need_exit) == 0 && fgets(line, sizeof(line), fp)) { if (line[0] == '#') { continue; } char *pos = strchr(line, '\n'); if (pos) { *pos = '\0'; } pcap_io_handler(pcap_io, line); } if (fp != stdin) { fclose(fp); } } PACKET_IO_LOG_FATAL("pcap io thread read all pcap files (files: %lu, pkts: %lu)", pcap_io->read_pcap_files, ATOMIC_READ(&pcap_io->read_pcap_pkts)); erro_out: while (ATOMIC_READ(&pcap_io->io_thread_need_exit) == 0) { if (all_packet_consumed(pcap_io)) { ATOMIC_SET(&pcap_io->io_thread_wait_exit, 1); } usleep(1000); // 1ms } PACKET_IO_LOG_FATAL("pcap io thread exit"); ATOMIC_SET(&pcap_io->io_thread_is_runing, 0); return NULL; } static void origin_free_cb(struct packet *pkt, void *args) { struct pcap_io *pcap_io = (struct pcap_io *)args; struct packet_origin *origin = packet_get_origin(pkt); struct pcap_pkt *pcap = origin->ctx; struct packet_io_stat *stat = &pcap_io->stat[origin->thr_idx]; struct packet_pool *pool = pcap_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); free(pcap); packet_pool_push(pool, pkt); } /****************************************************************************** * Public API ******************************************************************************/ void *pcap_io_new(const char *toml_file) { pthread_t tid; struct pcap_io *pcap_io = (struct pcap_io *)calloc(1, sizeof(struct pcap_io)); if (pcap_io == NULL) { PACKET_IO_LOG_ERROR("unable to allocate memory for pcap_io"); return NULL; } pcap_io->cfg = pcap_io_cfg_new(toml_file); if (pcap_io->cfg == NULL) { PACKET_IO_LOG_ERROR("unable to create pcap_io_cfg"); goto error_out; } pcap_io_cfg_print(pcap_io->cfg); pcap_io->logger = __thread_local_logger; for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE); if (pcap_io->ring[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create ring buffer"); goto error_out; } pcap_io->pool[i] = packet_pool_new(pcap_io->cfg->capacity); if (pcap_io->pool[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create packet pool"); goto error_out; } } if (pthread_create(&tid, NULL, pcap_io_thread, (void *)pcap_io) != 0) { PACKET_IO_LOG_ERROR("unable to create pcap io thread"); goto error_out; } return pcap_io; error_out: pcap_io_free(pcap_io); return NULL; } void pcap_io_free(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; if (pcap_io) { ATOMIC_SET(&pcap_io->io_thread_need_exit, 1); while (ATOMIC_READ(&pcap_io->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap = NULL; for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { while (1) { ring_buffer_pop(pcap_io->ring[i], (void **)&pcap); if (pcap) { free(pcap); } else { break; } } packet_pool_free(pcap_io->pool[i]); ring_buffer_free(pcap_io->ring[i]); } pcap_io_cfg_free(pcap_io->cfg); free(pcap_io); pcap_io = NULL; } } int pcap_io_is_done(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; if (pcap_io->cfg->pcap_done_exit) { return ATOMIC_READ(&pcap_io->io_thread_wait_exit); } else { return 0; } } int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return 0; } int pcap_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; struct pcap_pkt *pcap = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct ring_buffer *ring = pcap_io->ring[thr_idx]; struct packet_pool *pool = pcap_io->pool[thr_idx]; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; int ret = 0; for (int i = 0; i < nr_pkts; i++) { ring_buffer_pop(ring, (void **)&pcap); if (pcap == NULL) { break; } stat->pkts_rx++; stat->bytes_rx += pcap->len; pkt = packet_pool_pop(pool); assert(pkt != NULL); struct packet_origin origin = { .type = ORIGIN_TYPE_PCAP, .ctx = pcap, .cb = origin_free_cb, .args = pcap_io, .thr_idx = thr_idx, }; packet_parse(pkt, pcap->data, pcap->len); memset(&pkt->meta, 0, sizeof(pkt->meta)); packet_set_action(pkt, PACKET_ACTION_FORWARD); packet_set_timeval(pkt, &pcap->ts); packet_set_origin(pkt, &origin); pkts[ret++] = pkt; } return ret; } void pcap_io_send(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { int len = 0; struct tuple6 tuple; char file[PATH_MAX] = {0}; struct packet *pkt = NULL; struct pcap_pkt *pcap = NULL; struct packet_origin *origin = NULL; char src_addr_str[INET6_ADDRSTRLEN] = {0}; char dst_addr_str[INET6_ADDRSTRLEN] = {0}; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; len = packet_get_raw_len(pkt); origin = packet_get_origin(pkt); stat->pkts_tx++; stat->bytes_tx += len; if (origin->type == ORIGIN_TYPE_PCAP) { pcap = (struct pcap_pkt *)origin->ctx; free(pcap); packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { stat->pkts_injected++; stat->bytes_injected += len; memset(&tuple, 0, sizeof(struct tuple6)); packet_get_innermost_tuple6(pkt, &tuple); if (tuple.addr_family == AF_INET) { inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr_str, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr_str, INET6_ADDRSTRLEN); } else { inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr_str, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr_str, INET6_ADDRSTRLEN); } snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr_str, ntohs(tuple.src_port), dst_addr_str, ntohs(tuple.dst_port), stat->pkts_injected); if (packet_dump_pcap(pkt, file) == -1) { PACKET_IO_LOG_ERROR("unable to dump pcap file: %s", file); } else { PACKET_IO_LOG_FATAL("dump inject packet: %s", file); } packet_free(pkt); } pkts[i] = NULL; } } void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { int len = 0; struct packet *pkt = NULL; struct pcap_pkt *pcap = NULL; struct packet_origin *origin = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; len = packet_get_raw_len(pkt); origin = packet_get_origin(pkt); stat->pkts_dropped++; stat->bytes_dropped += len; if (origin->type == ORIGIN_TYPE_PCAP) { pcap = (struct pcap_pkt *)origin->ctx; free(pcap); packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { packet_free(pkt); } pkts[i] = NULL; } } void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return; } struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx) { struct pcap_io *pcap_io = (struct pcap_io *)handle; return &pcap_io->stat[thr_idx]; }