#include #include #include #include #include #include #include #include #include #include #include "tuple.h" #include "utils.h" #include "log_private.h" #include "dumpfile_io.h" #include "packet_dump.h" #include "packet_parser.h" #include "packet_private.h" #define PACKET_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__) #define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__) #define MAX_PACKET_QUEUE_SIZE (4096 * 1000) struct dumpfile_io { enum packet_io_mode mode; uint16_t nr_threads; char dumpfile_path[256]; pcap_t *pcap; struct logger *logger; struct packet_queue *queue[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; uint64_t read_pcap_files; uint64_t read_pcap_pkts; }; struct pcap_pkt { char *data; int len; struct timeval ts; }; /****************************************************************************** * Private API -- queue ******************************************************************************/ struct packet_queue { uint64_t *queue; uint32_t size; uint32_t head; uint32_t tail; }; static struct packet_queue *packet_queue_new(uint32_t size) { struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue)); if (queue == NULL) { PACKET_IO_LOG_ERROR("unable to new packet queue"); return NULL; } queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); if (queue->queue == NULL) { PACKET_IO_LOG_ERROR("unable to new packet queue"); free(queue); return NULL; } queue->size = size; queue->head = 0; queue->tail = 0; return queue; } static void packet_queue_free(struct packet_queue *queue) { if (queue == NULL) { return; } if (queue->queue) { free(queue->queue); queue->queue = NULL; } free(queue); } static int packet_queue_push(struct packet_queue *queue, void *data) { if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) { PACKET_IO_LOG_ERROR("packet queue is full, retry later"); return -1; } queue->tail = (queue->tail + 1) % queue->size; return 0; } static void packet_queue_pop(struct packet_queue *queue, void **data) { uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); if (read == 0) { *data = NULL; return; } __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); *data = (void *)read; queue->head = (queue->head + 1) % queue->size; } /****************************************************************************** * Private API -- utils ******************************************************************************/ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct dumpfile_io *handle = (struct dumpfile_io *)user; // copy packet data to new memory struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { PACKET_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); pcap_pkt->len = h->caplen; pcap_pkt->ts = h->ts; memcpy((char *)pcap_pkt->data, bytes, h->caplen); ATOMIC_INC(&handle->read_pcap_pkts); // calculate packet hash struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); // push packet to queue struct packet_queue *queue = handle->queue[hash % handle->nr_threads]; while (packet_queue_push(queue, pcap_pkt) == -1) { if (ATOMIC_READ(&handle->io_thread_need_exit)) { free(pcap_pkt); PACKET_IO_LOG_FATAL("dumpfile io thread need exit"); pcap_breakloop(handle->pcap); break; } usleep(1000); } if (ATOMIC_READ(&handle->io_thread_need_exit)) { PACKET_IO_LOG_FATAL("dumpfile io thread need exit"); pcap_breakloop(handle->pcap); } } static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); PACKET_IO_LOG_FATAL("dumpfile %s in-processing", resolved_path) handle->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (handle->pcap == NULL) { PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } handle->read_pcap_files++; pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle); pcap_close(handle->pcap); PACKET_IO_LOG_FATAL("dumpfile %s processed", resolved_path) return 0; } static int all_packet_consumed(struct dumpfile_io *handle) { uint64_t consumed_pkts = 0; uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts); for (uint16_t i = 0; i < handle->nr_threads; i++) { consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx); } if (consumed_pkts == read_pcap_pkts) { return 1; } else { return 0; } } static void *dumpfile_thread(void *arg) { struct dumpfile_io *handle = (struct dumpfile_io *)arg; __thread_local_logger = handle->logger; ATOMIC_SET(&handle->io_thread_is_runing, 1); PACKET_IO_LOG_FATAL("dumpfile io thread is running"); if (handle->mode == PACKET_IO_DUMPFILE) { dumpfile_handler(handle, handle->dumpfile_path); } else // PACKET_IO_DUMPFILELIST { FILE *fp = NULL; if (strcmp(handle->dumpfile_path, "-") == 0) { PACKET_IO_LOG_ERROR("dumpfile list is empty, read from stdin"); fp = stdin; } else { fp = fopen(handle->dumpfile_path, "r"); if (fp == NULL) { PACKET_IO_LOG_ERROR("unable to open dumpfile list: %s", handle->dumpfile_path); goto erro_out; } } char line[PATH_MAX]; while (ATOMIC_READ(&handle->io_thread_need_exit) == 0 && fgets(line, sizeof(line), fp)) { if (line[0] == '#') { continue; } char *pos = strchr(line, '\n'); if (pos) { *pos = '\0'; } dumpfile_handler(handle, line); } if (fp != stdin) { fclose(fp); } } PACKET_IO_LOG_FATAL("dumpfile io thread read all pcap files"); erro_out: while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) { if (all_packet_consumed(handle)) { ATOMIC_SET(&handle->io_thread_wait_exit, 1); } usleep(1000); // 1ms } PACKET_IO_LOG_FATAL("dumpfile io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts)); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; } /****************************************************************************** * Public API ******************************************************************************/ struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_threads) { pthread_t tid; struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io)); if (handle == NULL) { PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io"); return NULL; } handle->mode = mode; handle->nr_threads = nr_threads; handle->logger = __thread_local_logger; strncpy(handle->dumpfile_path, dumpfile_path, MIN(strlen(dumpfile_path), sizeof(handle->dumpfile_path))); for (uint16_t i = 0; i < handle->nr_threads; i++) { handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create packet queue"); goto error_out; } } if (pthread_create(&tid, NULL, dumpfile_thread, (void *)handle) != 0) { PACKET_IO_LOG_ERROR("unable to create packet io thread"); goto error_out; } return handle; error_out: dumpfile_io_free(handle); return NULL; } void dumpfile_io_free(struct dumpfile_io *handle) { if (handle) { ATOMIC_SET(&handle->io_thread_need_exit, 1); while (ATOMIC_READ(&handle->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap_pkt = NULL; for (uint16_t i = 0; i < handle->nr_threads; i++) { while (1) { packet_queue_pop(handle->queue[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); } else { break; } } packet_queue_free(handle->queue[i]); } free(handle); handle = NULL; } } int dumpfile_io_isbreak(struct dumpfile_io *handle) { return ATOMIC_READ(&handle->io_thread_wait_exit); } int dumpfile_io_init(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return 0; } uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet_queue *queue = handle->queue[thr_idx]; struct packet_io_stat *stat = &handle->stat[thr_idx]; struct pcap_pkt *pcap_pkt = NULL; struct packet *pkt; uint16_t nr_parsed = 0; for (uint16_t i = 0; i < nr_pkts; i++) { packet_queue_pop(queue, (void **)&pcap_pkt); if (pcap_pkt == NULL) { break; } else { ATOMIC_INC(&stat->pkts_rx); stat->bytes_rx += pcap_pkt->len; stat->raw_pkts_rx++; stat->raw_bytes_rx += pcap_pkt->len; pkt = &pkts[nr_parsed]; packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); memset(&pkt->meta, 0, sizeof(pkt->meta)); packet_set_origin_ctx(pkt, pcap_pkt); packet_set_action(pkt, PACKET_ACTION_FORWARD); packet_set_timeval(pkt, &pcap_pkt->ts); nr_parsed++; } } return nr_parsed; } void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; struct packet *pkt = NULL; struct packet_io_stat *stat = &handle->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); stat->pkts_tx++; stat->bytes_tx += len; stat->raw_pkts_tx++; stat->raw_bytes_tx += len; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { free(pcap_pkt); } packet_free(pkt); } } void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt = NULL; struct packet_io_stat *stat = &handle->stat[thr_idx]; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { stat->pkts_dropped++; stat->bytes_dropped += packet_get_raw_len(pkt); free(pcap_pkt); } packet_free(pkt); } } uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { uint16_t len; struct packet *pkt = NULL; struct packet_io_stat *stat = &handle->stat[thr_idx]; struct tuple6 tuple; char file[1024] = {0}; char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); stat->pkts_injected++; stat->bytes_injected += len; stat->raw_pkts_tx++; stat->raw_bytes_tx += len; stat->pkts_tx++; stat->bytes_tx += len; memset(&tuple, 0, sizeof(struct tuple6)); packet_get_innermost_tuple6(pkt, &tuple); if (tuple.addr_family == AF_INET) { inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); } else { inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); } snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); if (packet_dump_pcap(pkt, file) == -1) { PACKET_IO_LOG_ERROR("unable to dump pcap file: %s", file); } else { PACKET_IO_LOG_FATAL("dump inject packet: %s", file); } packet_free(pkt); } return nr_pkts; } void dumpfile_io_yield(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)), uint64_t timeout_ms __attribute__((unused))) { return; } struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx) { return &handle->stat[thr_idx]; }