#include #include #include #include #include #include "pcap_io.h" #include "packet_dump.h" #include "packet_pool.h" #include "packet_parser.h" #include "ip_reassembly.h" #include "log_internal.h" #include "packet_internal.h" #include "utils_internal.h" #define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) #define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) #define PCAP_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "pcap IO", format, ##__VA_ARGS__) #define RING_BUFFER_MAX_SIZE (4096 * 1000) struct pcap_pkt { char *data; int len; struct timeval ts; }; struct pcap_io_cfg { char mode[16]; // pcapfile, pcaplist char pcap_path[PATH_MAX]; uint16_t thread_num; // range [1, MAX_THREAD_NUM] // packet pool uint64_t capacity; // range: [1, 4294967295] // ip reassembly uint64_t fail_action; // 0: bypass, 1: drop uint64_t timeout_ms; // range: [1, 60000] (ms) uint64_t frag_queue_num; // range: [1, 4294967295 uint64_t frag_queue_size; // range: [2, 65535] }; struct pcap_io { struct pcap_io_cfg *cfg; pcap_t *pcap; struct logger *logger; struct ring_buffer *ring[MAX_THREAD_NUM]; struct packet_pool *pool[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; uint64_t read_pcap_files; uint64_t read_pcap_pkts; }; /****************************************************************************** * Private API -- ring ******************************************************************************/ struct ring_buffer { uint64_t *buff; uint32_t size; uint32_t head; uint32_t tail; }; static struct ring_buffer *ring_buffer_new(uint32_t size) { struct ring_buffer *ring = (struct ring_buffer *)calloc(1, sizeof(struct ring_buffer)); if (ring == NULL) { PCAP_IO_LOG_ERROR("unable to new ring buffer"); return NULL; } ring->buff = (uint64_t *)calloc(size, sizeof(uint64_t)); if (ring->buff == NULL) { PCAP_IO_LOG_ERROR("unable to new ring buffer"); free(ring); return NULL; } ring->size = size; ring->head = 0; ring->tail = 0; return ring; } static void ring_buffer_free(struct ring_buffer *ring) { if (ring) { if (ring->buff) { free(ring->buff); ring->buff = NULL; } free(ring); ring = NULL; } } static int ring_buffer_push(struct ring_buffer *ring, void *data) { if (__sync_val_compare_and_swap(&ring->buff[ring->tail], 0, data) != 0) { PCAP_IO_LOG_ERROR("ring buffer is full, retry later"); return -1; } ring->tail = (ring->tail + 1) % ring->size; return 0; } static void ring_buffer_pop(struct ring_buffer *ring, void **data) { uint64_t read = ATOMIC_READ(&ring->buff[ring->head]); if (read == 0) { *data = NULL; return; } __sync_val_compare_and_swap(&ring->buff[ring->head], read, 0); *data = (void *)read; ring->head = (ring->head + 1) % ring->size; } /****************************************************************************** * Private API -- config ******************************************************************************/ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file) { struct pcap_io_cfg *cfg = (struct pcap_io_cfg *)calloc(1, sizeof(struct pcap_io_cfg)); if (cfg == NULL) { return NULL; } int ret = 0; ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path); ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0) { PCAP_IO_LOG_ERROR("config file invalid packet_io.mode %s", cfg->mode); ret = -1; } if (ret != 0) { free(cfg); return NULL; } else { return cfg; } } static void pcap_io_cfg_free(struct pcap_io_cfg *cfg) { if (cfg) { free(cfg); cfg = NULL; } } static void pcap_io_cfg_print(const struct pcap_io_cfg *cfg) { if (cfg) { PCAP_IO_LOG_INFO("packet_io.mode : %s", cfg->mode); PCAP_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path); PCAP_IO_LOG_INFO("packet_io.thread_num : %ld", cfg->thread_num); PCAP_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); PCAP_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); PCAP_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); PCAP_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); PCAP_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); } } /****************************************************************************** * Private API -- pcap ******************************************************************************/ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct pcap_io *pcap_io = (struct pcap_io *)user; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { PCAP_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); pcap_pkt->len = h->caplen; pcap_pkt->ts = h->ts; memcpy((char *)pcap_pkt->data, bytes, h->caplen); ATOMIC_INC(&pcap_io->read_pcap_pkts); struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); struct ring_buffer *ring = pcap_io->ring[hash % pcap_io->cfg->thread_num]; while (ring_buffer_push(ring, pcap_pkt) == -1) { if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { free(pcap_pkt); PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); break; } usleep(1000); } if (ATOMIC_READ(&pcap_io->io_thread_need_exit)) { PCAP_IO_LOG_FATAL("pcap io thread need exit"); pcap_breakloop(pcap_io->pcap); } } static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; realpath(pcap_file, resolved_path); PCAP_IO_LOG_FATAL("pcap %s in-processing", resolved_path) pcap_io->pcap = pcap_open_offline(resolved_path, pcap_errbuf); if (pcap_io->pcap == NULL) { PCAP_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } pcap_io->read_pcap_files++; pcap_loop(pcap_io->pcap, -1, pcap_pkt_handler, (u_char *)pcap_io); pcap_close(pcap_io->pcap); PCAP_IO_LOG_FATAL("pcap %s processed", resolved_path) return 0; } static int all_packet_consumed(struct pcap_io *pcap_io) { uint64_t consumed_pkts = 0; uint64_t read_pcap_pkts = ATOMIC_READ(&pcap_io->read_pcap_pkts); for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { consumed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_rx); } if (consumed_pkts == read_pcap_pkts) { return 1; } else { return 0; } } static void *pcap_io_thread(void *arg) { struct pcap_io *pcap_io = (struct pcap_io *)arg; __thread_local_logger = pcap_io->logger; ATOMIC_SET(&pcap_io->io_thread_is_runing, 1); PCAP_IO_LOG_FATAL("pcap io thread is running"); if (strcmp(pcap_io->cfg->mode, "pcapfile") == 0) { pcap_io_handler(pcap_io, pcap_io->cfg->pcap_path); } else { FILE *fp = NULL; if (strcmp(pcap_io->cfg->pcap_path, "-") == 0) { PCAP_IO_LOG_ERROR("pcap path is empty, read from stdin"); fp = stdin; } else { fp = fopen(pcap_io->cfg->pcap_path, "r"); if (fp == NULL) { PCAP_IO_LOG_ERROR("unable to open pcap path: %s", pcap_io->cfg->pcap_path); goto erro_out; } } char line[PATH_MAX]; while (ATOMIC_READ(&pcap_io->io_thread_need_exit) == 0 && fgets(line, sizeof(line), fp)) { if (line[0] == '#') { continue; } char *pos = strchr(line, '\n'); if (pos) { *pos = '\0'; } pcap_io_handler(pcap_io, line); } if (fp != stdin) { fclose(fp); } } PACKET_IO_LOG_FATAL("pcap io thread read all pcap files (files: %lu, pkts: %lu)", pcap_io->read_pcap_files, ATOMIC_READ(&pcap_io->read_pcap_pkts)); erro_out: while (ATOMIC_READ(&pcap_io->io_thread_need_exit) == 0) { if (all_packet_consumed(pcap_io)) { ATOMIC_SET(&pcap_io->io_thread_wait_exit, 1); } usleep(1000); // 1ms } PACKET_IO_LOG_FATAL("pcap io thread exit"); ATOMIC_SET(&pcap_io->io_thread_is_runing, 0); return NULL; } static void origin_free_cb(struct packet *pkt, void *args) { struct pcap_io *pcap_io = (struct pcap_io *)args; struct packet_origin *origin = packet_get_origin(pkt); struct pcap_pkt *pcap_pkt = origin->ctx; struct packet_io_stat *stat = &pcap_io->stat[origin->thr_idx]; struct packet_pool *pool = pcap_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); free(pcap_pkt); packet_pool_push(pool, pkt); } static struct packet *recv_packet(struct pcap_io *pcap_io, struct pcap_pkt *pcap_pkt, uint16_t thr_idx) { struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; struct ip_reassembly *ip_reass = pcap_io->ip_reass[thr_idx]; struct packet_pool *pool = pcap_io->pool[thr_idx]; if (pcap_pkt == NULL) { return NULL; } stat->pkts_rx++; stat->bytes_rx += pcap_pkt->len; struct packet *pkt = packet_pool_pop(pool); assert(pkt != NULL); struct packet_origin origin = { .type = ORIGIN_TYPE_PCAP, .ctx = pcap_pkt, .cb = origin_free_cb, .args = pcap_io, .thr_idx = thr_idx, }; packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); memset(&pkt->meta, 0, sizeof(pkt->meta)); packet_set_action(pkt, PACKET_ACTION_FORWARD); packet_set_timeval(pkt, &pcap_pkt->ts); packet_set_origin(pkt, &origin); if (packet_is_fragment(pkt)) { return ip_reassembly_defrag(ip_reass, pkt, clock_get_real_time_ms()); } else { return pkt; } } static void send_packet(struct pcap_io *pcap_io, struct packet *pkt, uint16_t thr_idx) { struct pcap_pkt *pcap_pkt = NULL; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; int len = packet_get_raw_len(pkt); struct packet_origin *origin = packet_get_origin(pkt); if (origin->type == ORIGIN_TYPE_PCAP) { pcap_pkt = (struct pcap_pkt *)origin->ctx; free(pcap_pkt); packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { stat->pkts_injected++; stat->bytes_injected += len; struct tuple6 tuple; char file[PATH_MAX] = {0}; char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; memset(&tuple, 0, sizeof(struct tuple6)); packet_get_innermost_tuple6(pkt, &tuple); if (tuple.addr_family == AF_INET) { inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); } else { inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); } snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); if (packet_dump_pcap(pkt, file) == -1) { PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); } else { PCAP_IO_LOG_FATAL("dump inject packet: %s", file); } packet_free(pkt); } stat->pkts_tx++; stat->bytes_tx += len; } static void drop_packet(struct pcap_io *pcap_io, struct packet *pkt, uint16_t thr_idx) { struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; int len = packet_get_raw_len(pkt); struct packet_origin *origin = packet_get_origin(pkt); stat->pkts_dropped++; stat->bytes_dropped += len; if (origin->type == ORIGIN_TYPE_PCAP) { struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)origin->ctx; free(pcap_pkt); packet_pool_push(pcap_io->pool[thr_idx], pkt); } else { packet_free(pkt); } } /****************************************************************************** * Public API ******************************************************************************/ void *pcap_io_new(const char *toml_file) { pthread_t tid; struct pcap_io *pcap_io = (struct pcap_io *)calloc(1, sizeof(struct pcap_io)); if (pcap_io == NULL) { PCAP_IO_LOG_ERROR("unable to allocate memory for pcap_io"); return NULL; } pcap_io->cfg = pcap_io_cfg_new(toml_file); if (pcap_io->cfg == NULL) { PCAP_IO_LOG_ERROR("unable to create pcap_io_cfg"); goto error_out; } pcap_io_cfg_print(pcap_io->cfg); pcap_io->logger = __thread_local_logger; for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE); if (pcap_io->ring[i] == NULL) { PCAP_IO_LOG_ERROR("unable to create ring buffer"); goto error_out; } pcap_io->pool[i] = packet_pool_new(pcap_io->cfg->capacity); if (pcap_io->pool[i] == NULL) { PCAP_IO_LOG_ERROR("unable to create packet pool"); goto error_out; } pcap_io->ip_reass[i] = ip_reassembly_new(pcap_io->cfg->timeout_ms, pcap_io->cfg->frag_queue_num, pcap_io->cfg->frag_queue_size); if (pcap_io->ip_reass[i] == NULL) { PCAP_IO_LOG_ERROR("unable to create ip reassembly"); goto error_out; } } if (pthread_create(&tid, NULL, pcap_io_thread, (void *)pcap_io) != 0) { PCAP_IO_LOG_ERROR("unable to create pcap io thread"); goto error_out; } return pcap_io; error_out: pcap_io_free(pcap_io); return NULL; } void pcap_io_free(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; if (pcap_io) { ATOMIC_SET(&pcap_io->io_thread_need_exit, 1); while (ATOMIC_READ(&pcap_io->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap_pkt = NULL; for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { while (1) { ring_buffer_pop(pcap_io->ring[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); } else { break; } } ip_reassembly_free(pcap_io->ip_reass[i]); packet_pool_free(pcap_io->pool[i]); ring_buffer_free(pcap_io->ring[i]); } pcap_io_cfg_free(pcap_io->cfg); free(pcap_io); pcap_io = NULL; } } int pcap_io_isbreak(void *handle) { struct pcap_io *pcap_io = (struct pcap_io *)handle; return ATOMIC_READ(&pcap_io->io_thread_wait_exit); } int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return 0; } int pcap_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; struct pcap_pkt *pcap_pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct ring_buffer *ring = pcap_io->ring[thr_idx]; int ret = 0; for (int i = 0; i < nr_pkts; i++) { ring_buffer_pop(ring, (void **)&pcap_pkt); pkt = recv_packet(pcap_io, pcap_pkt, thr_idx); if (pkt) { pkts[ret++] = pkt; } } return ret; } void pcap_io_send(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *frag = NULL; struct packet *pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { send_packet(pcap_io, frag, thr_idx); } packet_free(pkt); } else { send_packet(pcap_io, pkt, thr_idx); } pkts[i] = NULL; } } void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; struct packet *frag = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { drop_packet(pcap_io, frag, thr_idx); } packet_free(pkt); } else { drop_packet(pcap_io, pkt, thr_idx); } pkts[i] = NULL; } } void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return; } void pcap_io_polling(void *handle, uint16_t thr_idx) { struct pcap_io *pcap_io = (struct pcap_io *)handle; struct ip_reassembly *ip_reass = pcap_io->ip_reass[thr_idx]; struct packet *pkt = NULL; uint64_t now_ms = clock_get_real_time_ms(); while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) { if (pcap_io->cfg->fail_action == 0) { send_packet(pcap_io, pkt, thr_idx); } else { drop_packet(pcap_io, pkt, thr_idx); } } // TODO // output stat } struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx) { struct pcap_io *pcap_io = (struct pcap_io *)handle; return &pcap_io->stat[thr_idx]; }