#include #include #include #include #include #include #include #include #include #include #include "log.h" #include "tuple.h" #include "macro.h" #include "dumpfile_io.h" #include "packet_def.h" #include "packet_ldbc.h" #include "packet_utils.h" #include "packet_parse.h" #include "lock_free_queue.h" #define PACKET_IO_LOG_STATE(format, ...) LOG_STATE("dumpfile io", format, ##__VA_ARGS__) #define PACKET_IO_LOG_ERROR(format, ...) LOG_ERROR("dumpfile io", format, ##__VA_ARGS__) #define MAX_PACKET_QUEUE_SIZE (4096 * 1000) struct dumpfile_io { uint16_t nr_threads; char work_dir[256]; char directory[256]; pcap_t *pcap; struct lock_free_queue *queue[MAX_THREAD_NUM]; struct io_stat stat[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; uint64_t io_thread_wait_exit; }; struct pcap_pkt { char *data; int len; }; struct pcap_file_hdr { unsigned int magic; unsigned short version_major; unsigned short version_minor; unsigned int thiszone; // gmt to local correction unsigned int sigfigs; // accuracy of timestamps unsigned int snaplen; // max length saved portion of each pkt unsigned int linktype; // data link type (LINKTYPE_*) }; struct pcap_pkt_hdr { unsigned int tv_sec; // time stamp unsigned int tv_usec; // time stamp unsigned int caplen; // length of portion present unsigned int len; // length this packet (off wire) }; struct pcap_file_hdr DEFAULT_PCAP_FILE_HDR = { .magic = 0xA1B2C3D4, .version_major = 0x0002, .version_minor = 0x0004, .thiszone = 0, .sigfigs = 0, .snaplen = 0xFFFF, .linktype = 1}; /****************************************************************************** * Private API ******************************************************************************/ static void save_packet(const char *work_dir, struct packet *pkt, uint64_t idx) { int len = 0; FILE *fp = NULL; struct tuple6 tuple; struct timeval ts = {0}; struct pcap_pkt_hdr pcap_hdr = {0}; char file[256] = {0}; char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; len = packet_get_raw_len(pkt); memset(&tuple, 0, sizeof(struct tuple6)); packet_get_innermost_tuple6(pkt, &tuple); if (tuple.ip_type == IP_TYPE_V4) { inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); } else { inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); } snprintf(file, sizeof(file), "%s/inject-%s:%u-%s:%u-%lu.pcap", work_dir, src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), idx); fp = fopen(file, "w+"); if (fp) { gettimeofday(&ts, NULL); pcap_hdr.tv_sec = ts.tv_sec; pcap_hdr.tv_usec = ts.tv_usec; pcap_hdr.caplen = len; pcap_hdr.len = len; fwrite(&DEFAULT_PCAP_FILE_HDR, sizeof(DEFAULT_PCAP_FILE_HDR), 1, fp); fwrite(&pcap_hdr, sizeof(struct pcap_pkt_hdr), 1, fp); fwrite(packet_get_raw_data(pkt), 1, len, fp); fflush(fp); fclose(fp); PACKET_IO_LOG_STATE("save packet to %s", file); } else { PACKET_IO_LOG_ERROR("unable to write pcap file: %s, %s", file, strerror(errno)); } } typedef int file_handle(const char *file, void *arg); static int scan_directory(const char *dir, file_handle *handler, void *arg) { struct stat statbuf; struct dirent *entry; DIR *dp = opendir(dir); if (NULL == dp) { PACKET_IO_LOG_ERROR("opendir %s failed, %s", dir, strerror(errno)); return -1; } if (chdir(dir) == -1) { PACKET_IO_LOG_ERROR("chdir %s failed, %s", dir, strerror(errno)); goto error_out; } while ((entry = readdir(dp))) { if (lstat(entry->d_name, &statbuf) == -1) { PACKET_IO_LOG_ERROR("lstat %s failed, %s", entry->d_name, strerror(errno)); goto error_out; } if (S_IFDIR & statbuf.st_mode) { if (!strcmp(".", entry->d_name) || !strcmp("..", entry->d_name)) { continue; } if (scan_directory(entry->d_name, handler, arg) == -1) { goto error_out; } } else { if (handler(entry->d_name, arg) == -1) { goto error_out; } } } if (chdir("..") == -1) { PACKET_IO_LOG_ERROR("chdir .. failed, %s", strerror(errno)); goto error_out; } closedir(dp); return 0; error_out: closedir(dp); return -1; } static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct dumpfile_io *handle = (struct dumpfile_io *)user; // copy packet data to new memory struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { PACKET_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); pcap_pkt->len = h->caplen; memcpy((char *)pcap_pkt->data, bytes, h->caplen); // calculate packet hash struct packet pkt; memset(&pkt, 0, sizeof(struct packet)); packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); uint64_t hash = packet_ldbc_hash(&pkt, LDBC_HASH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); // push packet to queue struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads]; while (lock_free_queue_push(queue, pcap_pkt) == -1) { if (ATOMIC_READ(&handle->io_thread_need_exit)) { free(pcap_pkt); PACKET_IO_LOG_STATE("dumpfile io thread need exit"); pcap_breakloop(handle->pcap); break; } usleep(1000); } if (ATOMIC_READ(&handle->io_thread_need_exit)) { PACKET_IO_LOG_STATE("dumpfile io thread need exit"); pcap_breakloop(handle->pcap); } } static int dumpfile_handler(const char *file, void *arg) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; struct dumpfile_io *handle = (struct dumpfile_io *)arg; realpath(file, resolved_path); PACKET_IO_LOG_STATE("dumpfile %s in-processing", resolved_path) handle->pcap = pcap_open_offline(file, pcap_errbuf); if (handle->pcap == NULL) { PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } pcap_loop(handle->pcap, -1, pcap_packet_handler, (u_char *)handle); pcap_close(handle->pcap); PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path) return 0; } static int all_packet_processed(struct dumpfile_io *handle) { for (uint16_t i = 0; i < handle->nr_threads; i++) { if (!lock_free_queue_empty(handle->queue[i])) { return 0; } } return 1; } static void *dumpfile_thread(void *arg) { struct dumpfile_io *handle = (struct dumpfile_io *)arg; ATOMIC_SET(&handle->io_thread_is_runing, 1); PACKET_IO_LOG_STATE("dumpfile io thread is running"); scan_directory(handle->directory, dumpfile_handler, arg); while (ATOMIC_READ(&handle->io_thread_need_exit) == 0) { if (all_packet_processed(handle)) { ATOMIC_SET(&handle->io_thread_wait_exit, 1); } PACKET_IO_LOG_STATE("dumpfile io thread waiting"); sleep(1); } PACKET_IO_LOG_STATE("dumpfile io thread exit !!!"); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; } /****************************************************************************** * Public API ******************************************************************************/ struct dumpfile_io *dumpfile_io_new(const char *directory, uint16_t nr_threads) { pthread_t tid; struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io)); if (handle == NULL) { PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io"); return NULL; } if (getcwd(handle->work_dir, sizeof(handle->work_dir)) == NULL) { PACKET_IO_LOG_ERROR("unable to get current work directory"); goto error_out; } handle->nr_threads = nr_threads; strncpy(handle->directory, directory, MIN(strlen(directory), sizeof(handle->directory))); for (uint16_t i = 0; i < handle->nr_threads; i++) { handle->queue[i] = lock_free_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create packet queue"); goto error_out; } } if (pthread_create(&tid, NULL, dumpfile_thread, (void *)handle) != 0) { PACKET_IO_LOG_ERROR("unable to create packet io thread"); goto error_out; } return handle; error_out: dumpfile_io_free(handle); return NULL; } void dumpfile_io_free(struct dumpfile_io *handle) { if (handle) { ATOMIC_SET(&handle->io_thread_need_exit, 1); while (ATOMIC_READ(&handle->io_thread_is_runing)) { usleep(1000); } struct pcap_pkt *pcap_pkt = NULL; for (uint16_t i = 0; i < handle->nr_threads; i++) { while (1) { lock_free_queue_pop(handle->queue[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); } else { break; } } lock_free_queue_free(handle->queue[i]); } free(handle); handle = NULL; } } int dumpfile_io_wait_exit(struct dumpfile_io *handle) { return ATOMIC_READ(&handle->io_thread_wait_exit); } int dumpfile_io_init(struct dumpfile_io *handle, uint16_t thr_idx) { return 0; } int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) { struct lock_free_queue *queue = handle->queue[thr_idx]; struct io_stat *stat = &handle->stat[thr_idx]; struct pcap_pkt *pcap_pkt = NULL; struct packet *pkt; int nr_parsed = 0; for (int i = 0; i < nr_pkts; i++) { lock_free_queue_pop(queue, (void **)&pcap_pkt); if (pcap_pkt == NULL) { break; } else { stat->dev_rx_pkts++; stat->dev_rx_bytes += pcap_pkt->len; stat->raw_rx_pkts++; stat->raw_rx_bytes += pcap_pkt->len; pkt = &pkts[nr_parsed]; memset(pkt, 0, sizeof(struct packet)); packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); packet_set_origin_ctx(pkt, pcap_pkt); packet_set_action(pkt, PACKET_ACTION_FORWARD); nr_parsed++; } } return nr_parsed; } void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) { int len; struct packet *pkt = NULL; struct io_stat *stat = &handle->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); stat->dev_tx_pkts++; stat->dev_tx_bytes += len; stat->raw_tx_pkts++; stat->raw_tx_bytes += len; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { free(pcap_pkt); } packet_free(pkt); } } void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) { struct packet *pkt = NULL; struct io_stat *stat = &handle->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { stat->drop_pkts++; stat->drop_bytes += packet_get_raw_len(pkt); free(pcap_pkt); } packet_free(pkt); } } int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) { int len; struct packet *pkt = NULL; struct io_stat *stat = &handle->stat[thr_idx]; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); stat->inject_pkts++; stat->inject_bytes += len; stat->raw_tx_pkts++; stat->raw_tx_bytes += len; stat->dev_tx_pkts++; stat->dev_tx_bytes += len; save_packet(handle->work_dir, pkt, stat->inject_pkts); packet_free(pkt); } return nr_pkts; } void dumpfile_io_yield(struct dumpfile_io *handle, uint16_t thr_idx, uint64_t timeout_ms) { return; } struct io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx) { return &handle->stat[thr_idx]; }