#include #include #include #include #include #include #include #include #include #include #include "tuple.h" #include "utils.h" #include "log_internal.h" #include "pcap_io.h" #include "packet_dump.h" #include "packet_parser.h" #include "packet_internal.h" #define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) #define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap io", format, ##__VA_ARGS__) #define MAX_PACKET_QUEUE_SIZE (4096 * 1000) struct pcap_io { struct packet_io_config cfg; pcap_t *pcap; struct logger *logger; struct packet_queue *queue[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; uint64_t read_pcap_files; uint64_t read_pcap_pkts; }; struct pcap_pkt { char *data; int len; struct timeval ts; }; /****************************************************************************** * Private API -- queue ******************************************************************************/ struct packet_queue { uint64_t *queue; uint32_t size; uint32_t head; uint32_t tail; }; static struct packet_queue *packet_queue_new(uint32_t size) { struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue)); if (queue == NULL) { PCAP_IO_LOG_ERROR("unable to new packet queue"); return NULL; } queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); if (queue->queue == NULL) { PCAP_IO_LOG_ERROR("unable to new packet queue"); free(queue); return NULL; } queue->size = size; queue->head = 0; queue->tail = 0; return queue; } static void packet_queue_free(struct packet_queue *queue) { if (queue == NULL) { return; } if (queue->queue) { free(queue->queue); queue->queue = NULL; } free(queue); } static int packet_queue_push(struct packet_queue *queue, void *data) { if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) { PCAP_IO_LOG_ERROR("packet queue is full, retry later"); return -1; } queue->tail = (queue->tail + 1) % queue->size; return 0; } static void packet_queue_pop(struct packet_queue *queue, void **data) { uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); if (read == 0) { *data = NULL; return; } __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); *data = (void *)read; queue->head = (queue->head + 1) % queue->size; } /****************************************************************************** * Private API -- utils ******************************************************************************/ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct pcap_io *handle = (struct pcap_io *)user; // copy packet data to new memory struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { PCAP_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); pcap_pkt->len = h->caplen; pcap_pkt->ts = h->ts; memcpy((char *)pcap_pkt->data, bytes, h->caplen); ATOMIC_INC(&handle->read_pcap_pkts); // calculate packet hash struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); // push packet to queue struct packet_queue *queue = handle->queue[hash % handle->cfg.nr_worker_thread]; while (packet_queue_push(queue, pcap_pkt) == -1) { if (ATOMIC_READ(&handle->io_thread_need_exit)) { free(pcap_pkt); PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(handle->pcap); break; } usleep(1000); } if (ATOMIC_READ(&handle->io_thread_need_exit)) { PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(handle->pcap); } } static int pcap_io_handler(struct pcap_io *handle, const char *pcap_file) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); PCAP_IO_LOG_FATAL("pcap %s in-processing", resolved_path) handle->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (handle->pcap == NULL) { PCAP_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } handle->read_pcap_files++; pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle); pcap_close(handle->pcap); PCAP_IO_LOG_FATAL("pcap %s processed", resolved_path) return 0; } static int all_packet_consumed(struct pcap_io *handle) { uint64_t consumed_pkts = 0; uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts); for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++) { consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx); } if (consumed_pkts == read_pcap_pkts) { return 1; } else { return 0; } } static void *pcap_io_thread(void *arg) { struct pcap_io *handle = (struct pcap_io *)arg; __thread_local_logger = handle->logger; ATOMIC_SET(&handle->io_thread_is_runing, 1); PCAP_IO_LOG_FATAL("pcap io thread is running"); if (handle->cfg.mode == PACKET_IO_PCAPFILE) { pcap_io_handler(handle, handle->cfg.pcap_path); } else // PACKET_IO_PCAPLIST { FILE *fp = NULL; if (strcmp(handle->cfg.pcap_path, "-") == 0) { PCAP_IO_LOG_ERROR("pcap path is empty, read from stdin"); fp = stdin; } else { fp = fopen(handle->cfg.pcap_path, "r"); if (fp == NULL) { PCAP_IO_LOG_ERROR("unable to open pcap path: %s", handle->cfg.pcap_path); goto erro_out; } } char line[PATH_MAX]; while (ATOMIC_READ(&handle->io_thread_need_exit) == 0 && fgets(line, sizeof(line), fp)) { if (line[0] == '#') { continue; } char *pos = strchr(line, '\n'); if (pos) { *pos = '\0'; } pcap_io_handler(handle, line); } if (fp != stdin) { fclose(fp); } } PCAP_IO_LOG_FATAL("pcap io thread read all pcap files"); erro_out: while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) { if (all_packet_consumed(handle)) { ATOMIC_SET(&handle->io_thread_wait_exit, 1); } usleep(1000); // 1ms } PCAP_IO_LOG_FATAL("pcap io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts)); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; } /****************************************************************************** * Public API ******************************************************************************/ void *pcap_io_new(const struct packet_io_config *cfg) { pthread_t tid; struct pcap_io *handle = (struct pcap_io *)calloc(1, sizeof(struct pcap_io)); if (handle == NULL) { PCAP_IO_LOG_ERROR("unable to allocate memory for pcap_io"); return NULL; } handle->logger = __thread_local_logger; memcpy(&handle->cfg, cfg, sizeof(struct packet_io_config)); for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++) { handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { PCAP_IO_LOG_ERROR("unable to create packet queue"); goto error_out; } } if (pthread_create(&tid, NULL, pcap_io_thread, (void *)handle) != 0) { PCAP_IO_LOG_ERROR("unable to create pcap io thread"); goto error_out; } return handle; error_out: pcap_io_free(handle); return NULL; } void pcap_io_free(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; if (pcap_io) { ATOMIC_SET(&pcap_io->io_thread_need_exit, 1); while (ATOMIC_READ(&pcap_io->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap_pkt = NULL; for (uint16_t i = 0; i < pcap_io->cfg.nr_worker_thread; i++) { while (1) { packet_queue_pop(pcap_io->queue[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); } else { break; } } packet_queue_free(pcap_io->queue[i]); } free(pcap_io); pcap_io = NULL; } } int pcap_io_isbreak(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; return ATOMIC_READ(&pcap_io->io_thread_wait_exit); } int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return 0; } uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { uint16_t nr_packet_parsed = 0; struct packet *pkt = NULL; struct pcap_pkt *pcap_pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_queue *queue = pcap_io->queue[thr_idx]; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { packet_queue_pop(queue, (void **)&pcap_pkt); if (pcap_pkt == NULL) { break; } else { ATOMIC_INC(&stat->pkts_rx); stat->bytes_rx += pcap_pkt->len; stat->raw_pkts_rx++; stat->raw_bytes_rx += pcap_pkt->len; pkt = &pkts[nr_packet_parsed]; packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); memset(&pkt->meta, 0, sizeof(pkt->meta)); packet_set_origin_ctx(pkt, pcap_pkt); packet_set_action(pkt, PACKET_ACTION_FORWARD); packet_set_timeval(pkt, &pcap_pkt->ts); nr_packet_parsed++; } } return nr_packet_parsed; } void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; struct tuple6 tuple; struct packet *pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; char file[PATH_MAX] = {0}; char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); stat->pkts_tx++; stat->bytes_tx += len; if (packet_is_ctrl(pkt)) { stat->ctrl_pkts_tx++; stat->ctrl_bytes_tx += len; } else { stat->raw_pkts_tx++; stat->raw_bytes_tx += len; } struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { free(pcap_pkt); } else { stat->pkts_injected++; stat->bytes_injected += len; memset(&tuple, 0, sizeof(struct tuple6)); packet_get_innermost_tuple6(pkt, &tuple); if (tuple.addr_family == AF_INET) { inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); } else { inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); } snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); if (packet_dump_pcap(pkt, file) == -1) { PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); } else { PCAP_IO_LOG_FATAL("dump inject packet: %s", file); } } } } void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { stat->pkts_dropped++; stat->bytes_dropped += packet_get_raw_len(pkt); free(pcap_pkt); } packet_free(pkt); } } void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return; } struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx) { struct pcap_io *pcap_io = (struct pcap_io *)handle; return &pcap_io->stat[thr_idx]; }