/* ********************************************************************************************** * File: pcap_file.cpp * Description: pcap file runmode api * Authors: Liu WenTan * Date: 2022-07-15 * Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. *********************************************************************************************** */ #include #include #include #include #include #include #include #include #include "utils.h" #include "util_errors.h" #include "logger.h" #include "time_helper.h" #include "pio_pcap_file.h" #include "packet_io.h" #include "packet_io_util.h" #include "packet_io_internal.h" #define MAX_RECV_BURST 64 struct safe_pending_file_queue g_pending_file_queue; /** * @brief validate path is a valid plain file or directory * * @retval failed (-1) successful (0), * if success, dir == nullptr <---> means path is plain file * dir != nullptr <---> means path is directory */ static ssize_t validate_directory_or_file(const char *path, DIR **dir) { DIR *temp_dir = nullptr; ssize_t ret = -1; temp_dir = opendir(path); if (nullptr == temp_dir) { switch (errno) { case EACCES: log_error(ST_ERR_FOPEN, "%s: Permission denied", path); break; case EBADF: log_error(ST_ERR_FOPEN, "%s: invalid file descriptor", path); break; case ENOTDIR: log_info("%s: is a plain file, not directory", path); ret = 0; break; default: log_error(ST_ERR_FOPEN, "%s: errno:%d", path, errno); } } else { *dir = temp_dir; ret = 0; } return ret; } static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info) { char errbuf[PCAP_ERRBUF_SIZE] = ""; if (nullptr == pfile_info || nullptr == pfile_info->file_name) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pfile_info pointer or file_name"); return -1; } pfile_info->pcap_handle = pcap_open_offline(pfile_info->file_name, errbuf); if (nullptr == pfile_info->pcap_handle) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "open pcap file:%s failed.", pfile_info->file_name); return -1; } pthread_mutex_init(&pfile_info->handle_mutex, nullptr); if (pfile_info->shared != nullptr && pfile_info->shared->bpf_string != nullptr) { if (pcap_compile(pfile_info->pcap_handle, &pfile_info->filter, pfile_info->shared->bpf_string, 1, 0) < 0) { log_error(ST_ERR_BPF, "bpf compilation error %s for %s", pcap_geterr(pfile_info->pcap_handle), pfile_info->file_name); return -1; } if (pcap_setfilter(pfile_info->pcap_handle, &pfile_info->filter) < 0) { log_error(ST_ERR_BPF, "could not set bpf filter %s for %s", pcap_geterr(pfile_info->pcap_handle), pfile_info->file_name); pcap_freecode(&pfile_info->filter); return -1; } pcap_freecode(&pfile_info->filter); } pfile_info->data_link = pcap_datalink(pfile_info->pcap_handle); return 0; } static ssize_t pcap_plain_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *file_name) { if (nullptr == pfile_dev_ctx) { return -1; } struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); if (nullptr == pfile_info) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_plain_file_info failed."); return -1; } /* TODO: get conf and assign pfile_info */ int ret = strncpy_safe(pfile_info->file_name, file_name, sizeof(pfile_info->file_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap plain file name copy failed."); return -1; } pfile_info->shared = &pfile_dev_ctx->shared; ret = init_pcap_file(pfile_info); if (ret < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); FREE(pfile_info); return -1; } else { pfile_dev_ctx->is_dir = 0; pfile_dev_ctx->entity.file = pfile_info; } return 0; } static ssize_t pcap_directory_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *dir_name, DIR *directory) { if (nullptr == pfile_dev_ctx) { return -1; } struct pcap_file_directory_info *pdir_info = CALLOC(struct pcap_file_directory_info, 1); if (nullptr == pdir_info) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_directory_info failed."); return -1; } /* TODO: get conf and assign pdir_info */ int ret = strncpy_safe(pdir_info->dir_name, dir_name, sizeof(pdir_info->dir_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap directory name copy failed."); return -1; } /* TODO: if should configurable */ pdir_info->delay = 30; pdir_info->shared = &pfile_dev_ctx->shared; pdir_info->directory = directory; pfile_dev_ctx->is_dir = 1; pfile_dev_ctx->entity.dir = pdir_info; /* init global pending file queue */ TAILQ_INIT(&g_pending_file_queue.file_queue_head); pthread_mutex_init(&g_pending_file_queue.queue_mutex, nullptr); /* pcap file device context mutex init */ pthread_mutex_init(&pfile_dev_ctx->ctx_mutex, nullptr); return 0; } static ssize_t pcap_file_shared_init(struct pio_pcap_file_device_context *pfile_dev_ctx) { if (nullptr == pfile_dev_ctx) { return -1; } /* TODO: get conf and assign pfile_dev_ctx->shared */ if ((g_packet_io_config.common.mode == PACKET_IO_RUN_MODE_PCAP_FILE) && g_packet_io_config.pcap.bpf_string != nullptr) { memset(pfile_dev_ctx->shared.bpf_string, 0, sizeof(pfile_dev_ctx->shared.bpf_string)); ssize_t ret = strncpy_safe(pfile_dev_ctx->shared.bpf_string, g_packet_io_config.pcap.bpf_string, sizeof(pfile_dev_ctx->shared.bpf_string)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap file bpf string copy failed."); return -1; } } pfile_dev_ctx->shared.should_delete = g_packet_io_config.pcap.should_delete; /* init pcap file device packet queue */ for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) { pio_packet_queue_init(&pfile_dev_ctx->pkt_queues[i]); } return 0; } static void cleanup_pcap_plain_file_info(struct pcap_plain_file_info *pfile_info) { if (pfile_info != nullptr) { if (pfile_info->pcap_handle != nullptr) { pcap_close(pfile_info->pcap_handle); pfile_info->pcap_handle = nullptr; } if (pfile_info->file_name != nullptr) { if (pfile_info->shared != nullptr && pfile_info->shared->should_delete) { log_debug("Deleting pcap file:%s", pfile_info->file_name); if (unlink(pfile_info->file_name) != 0) { log_notice(ST_ERR_PCAP_FILE_DELETE_FAILED, "Failed to delete %s", pfile_info->file_name); } } pfile_info->shared = nullptr; } } } static void cleanup_pcap_directory_info(struct pcap_file_directory_info *pdir_info) { } ssize_t pio_pcap_file_device_open(struct packet_io_device *pdev) { ssize_t status = -1; DIR *directory = nullptr; if (nullptr == pdev) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid packet_io_device pointer."); return -1; } pdev->entity.pcap_file_dev_ctx = CALLOC(struct pio_pcap_file_device_context, 1); if (nullptr == pdev->entity.pcap_file_dev_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_dev_ctx failed."); return -1; } pdev->entity.pcap_file_dev_ctx->pdev = pdev; status = pcap_file_shared_init(pdev->entity.pcap_file_dev_ctx); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap file shared init failed."); return -1; } if (validate_directory_or_file(pdev->dev_name, &directory) != 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid path:%s (not plain file or directory)", pdev->dev_name); return -1; } if (nullptr == directory) { /* plain file */ status = pcap_plain_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap plain file init failed."); return -1; } } else { /* directory */ status = pcap_directory_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name, directory); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap directory file init failed."); return -1; } } return 0; } ssize_t pio_pcap_file_device_close(struct packet_io_device *pdev) { if (nullptr == pdev) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pdev pointer so close pcap file device failed!"); return -1; } if (pdev->entity.pcap_file_dev_ctx != nullptr) { if (!pdev->entity.pcap_file_dev_ctx->is_dir && pdev->entity.pcap_file_dev_ctx->entity.file != nullptr) { cleanup_pcap_plain_file_info(pdev->entity.pcap_file_dev_ctx->entity.file); } if (pdev->entity.pcap_file_dev_ctx->is_dir && pdev->entity.pcap_file_dev_ctx->entity.dir != nullptr) { cleanup_pcap_directory_info(pdev->entity.pcap_file_dev_ctx->entity.dir); } for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) { if (pdev->entity.pcap_file_dev_ctx->pkt_queues[i].len != 0) { release_pio_packet_queue(&pdev->entity.pcap_file_dev_ctx->pkt_queues[i]); } } FREE(pdev->entity.pcap_file_dev_ctx); } return 0; } void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_char *pkt) { struct pio_pcap_file_device_context *pfile_dev_ctx = (struct pio_pcap_file_device_context *)user; uint32_t p_len = 0; if (pkt_hdr->caplen < DEFAULT_MTU) { p_len = COMMON_SIZE_OF_PIO_PACKET; } else { p_len = MAX_SIZE_OF_PIO_PACKET; } struct pio_packet *p = (struct pio_packet *)malloc(p_len); if (nullptr == p) { return; } memset(p, 0, p_len); p->pkt_hdr = p; p->pkt_payload = (uint8_t *)p + CUSTOM_ZONE_LEN; p->pkt_len = pkt_hdr->caplen; //p->data_link = pfile_dev_ctx->entity.file->data_link; p->data_link = 1; if (packet_copy_data((uint8_t *)p->pkt_payload, (uint8_t *)pkt, pkt_hdr->caplen)) { FREE(p); return; } /* nr_rxq <= VIRTUAL_QUEUE_MAX_NUM */ uint16_t nr_rxq = pfile_dev_ctx->pdev->rxq_num; uint16_t rxq_id = pio_packet_hash(p) % nr_rxq; /* hash to specific queue id and enqueue */ pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); pio_packet_enqueue(&pfile_dev_ctx->pkt_queues[rxq_id], p); pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); } static ssize_t pcap_directory_file_pkts_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id) { int packet_q_len = MAX_RECV_BURST; ssize_t res = -1; res = pcap_dispatch(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle, packet_q_len, (pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle), pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name); } else if (res == 0) { log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name, res); } return res; } static ssize_t get_pcap_file_pkts(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { ssize_t res = -1; size_t i = 0; uint32_t q_len = 0; struct pio_packet *p = nullptr; pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); if (pfile_dev_ctx->pkt_queues[rxq_id].len > 0) { do { p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; pkts[i] = (struct stellar_packet *)p; i++; } while ((q_len != 0) && (i < nr_pkts)); } pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); if (q_len == 0) { res = i; } else { res = nr_pkts; } return res; } static ssize_t pcap_file_pkts_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx) { int packet_q_len = MAX_RECV_BURST; ssize_t res = -1; pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); res = pcap_dispatch(pfile_dev_ctx->entity.file->pcap_handle, packet_q_len, (pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.file->pcap_handle), pfile_dev_ctx->entity.file->file_name); } else if (res == 0) { log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res); } return res; } static ssize_t pcap_directory_get_modified_time(char *pfile, struct timespec *out) { struct stat buf; ssize_t ret = -1; if (nullptr == pfile) { return ret; } if ((ret = stat(pfile, &buf)) != 0) { return ret; } out->tv_sec = buf.st_mtim.tv_sec; out->tv_nsec = buf.st_mtim.tv_nsec; return ret; } struct pending_file * find_pending_file_to_add(struct pio_pcap_file_device_context *pfile_dev_ctx, struct dirent *dir, struct timespec *deadline) { char abs_path[PATH_MAX] = {0}; snprintf(abs_path, sizeof(abs_path), "%s/%s", pfile_dev_ctx->entity.dir->dir_name, dir->d_name); struct timespec temp_time; memset(&temp_time, 0, sizeof(struct timespec)); if (pcap_directory_get_modified_time(abs_path, &temp_time) < 0) { log_debug("unable to get modified time on %s, skipping", abs_path); return nullptr; } /* skip files outside of out time range */ if (compare_timespec(&temp_time, &pfile_dev_ctx->shared.last_processed_ts) <= 0) { log_debug("skipping old file %s", abs_path); return nullptr; } else if (compare_timespec(&temp_time, deadline) >= 0) { log_debug("skipping new file %s", abs_path); return nullptr; } struct pending_file *file_to_add = CALLOC(struct pending_file, 1); ssize_t ret = strncpy_safe(file_to_add->file_name, abs_path, sizeof(file_to_add->file_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "file_to_add file name copy failed."); return nullptr; } copy_timespec(&temp_time, &file_to_add->modified_time); log_info("found \"%s\" at %" PRIuMAX, file_to_add->file_name, (uintmax_t)timespec_to_millisecond(&file_to_add->modified_time)); return file_to_add; } static ssize_t pcap_directory_insert_file(struct pending_file *file_to_add) { if (nullptr == file_to_add) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory or file parameters."); return -1; } log_debug("inserting %s into pending file queue", file_to_add->file_name); pthread_mutex_lock(&g_pending_file_queue.queue_mutex); if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) { TAILQ_INSERT_TAIL(&g_pending_file_queue.file_queue_head, file_to_add, next); } else { /* pending file queue is not empty */ struct pending_file *file_to_compare = TAILQ_FIRST(&g_pending_file_queue.file_queue_head); while (file_to_compare != nullptr) { if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) < 0) { TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next); file_to_compare = nullptr; } else if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) > 0) { struct pending_file *next_file_to_compare = TAILQ_NEXT(file_to_compare, next); if (next_file_to_compare == nullptr) { TAILQ_INSERT_AFTER(&g_pending_file_queue.file_queue_head, file_to_compare, file_to_add, next); } file_to_compare = next_file_to_compare; } else { /* find same file, ignore it */ printf("find same file\n"); break; } } } pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); return 0; } #define SWAPLONG(y) \ (((((u_int)(y))&0xff)<<24) | \ ((((u_int)(y))&0xff00)<<8) | \ ((((u_int)(y))&0xff0000)>>8) | \ ((((u_int)(y))>>24)&0xff)) #define SWAPSHORT(y) \ ((u_short)(((((u_int)(y))&0xff)<<8) | \ ((((u_int)(y))&0xff00)>>8))) /* Standard libpcap format. */ #define TCPDUMP_MAGIC 0xa1b2c3d4 /* Alexey Kuznetzov's modified libpcap format. */ #define KUZNETZOV_TCPDUMP_MAGIC 0xa1b2cd34 /* * Normal libpcap format, except for seconds/nanoseconds timestamps, * as per a request by Ulf Lamping */ #define NSEC_TCPDUMP_MAGIC 0xa1b23c4d #define LT_RESERVED1(x) ((x) & 0x03FF0000) static ssize_t pcap_header_check(const uint8_t *magic, FILE *fp) { int swapped = 0; size_t amt_read; uint32_t magic_int; struct pcap_file_header hdr; /* * Check whether the first 4 bytes of the file are the magic * number for a pcap savefile, or for a byte-swapped pcap * savefile. */ memcpy(&magic_int, magic, sizeof(magic_int)); if (magic_int != TCPDUMP_MAGIC && magic_int != KUZNETZOV_TCPDUMP_MAGIC && magic_int != NSEC_TCPDUMP_MAGIC) { magic_int = SWAPLONG(magic_int); if (magic_int != TCPDUMP_MAGIC && magic_int != KUZNETZOV_TCPDUMP_MAGIC && magic_int != NSEC_TCPDUMP_MAGIC) { return -1; } swapped = 1; } /* * They are. Put the magic number in the header, and read * the rest of the header. */ hdr.magic = magic_int; amt_read = fread(((char *)&hdr) + sizeof hdr.magic, 1, sizeof(hdr) - sizeof(hdr.magic), fp); if (amt_read != sizeof(hdr) - sizeof(hdr.magic)) { return -1; } /* * If it's a byte-swapped capture file, byte-swap the header. */ if (swapped) { hdr.version_major = SWAPSHORT(hdr.version_major); hdr.version_minor = SWAPSHORT(hdr.version_minor); hdr.thiszone = SWAPLONG(hdr.thiszone); hdr.sigfigs = SWAPLONG(hdr.sigfigs); hdr.snaplen = SWAPLONG(hdr.snaplen); hdr.linktype = SWAPLONG(hdr.linktype); } if (hdr.version_major < PCAP_VERSION_MAJOR) { return -1; } /* * currently only versions 2.[0-4] are supported with * the exception of 543.0 for DG/UX tcpdump. */ if (! ((hdr.version_major == PCAP_VERSION_MAJOR && hdr.version_minor <= PCAP_VERSION_MINOR) || (hdr.version_major == 543 && hdr.version_minor == 0))) { return -1; } /* * Check the main reserved field. */ if (LT_RESERVED1(hdr.linktype) != 0) { return -1; } return 0; } #define BT_SHB 0x0A0D0D0A #define BT_SHB_INSANE_MAX 1024U*1024U*1U /* 1MB should be enough */ #define BYTE_ORDER_MAGIC 0x1A2B3C4D struct section_header_block { uint32_t byte_order_magic; u_short major_version; u_short minor_version; uint64_t section_length; /* followed by options and trailer */ }; struct block_header { uint32_t block_type; uint32_t total_length; }; static ssize_t pcapng_header_check(const uint8_t *magic, FILE *fp) { uint32_t magic_int; uint32_t total_length; uint32_t byte_order_magic; size_t amt_read; struct block_header *bhdrp; struct section_header_block *shbp; /* * Check whether the first 4 bytes of the file are the block * type for a pcapng savefile. */ memcpy(&magic_int, magic, sizeof(magic_int)); if (magic_int != BT_SHB) { return -1; } /* * OK, they are. However, that's just \n\r\r\n, so it could, * conceivably, be an ordinary text file. * * It could not, however, conceivably be any other type of * capture file, so we can read the rest of the putative * Section Header Block; put the block type in the common * header, read the rest of the common header and the * fixed-length portion of the SHB, and look for the byte-order * magic value. */ amt_read = fread(&total_length, 1, sizeof(total_length), fp); if (amt_read < sizeof(total_length)) { return -1; } amt_read = fread(&byte_order_magic, 1, sizeof(byte_order_magic), fp); if (amt_read < sizeof(byte_order_magic)) { return -1; } if (byte_order_magic != BYTE_ORDER_MAGIC) { byte_order_magic = SWAPLONG(byte_order_magic); if (byte_order_magic != BYTE_ORDER_MAGIC) { return -1; } total_length = SWAPLONG(total_length); } /* * Check the sanity of the total length. */ if (total_length < sizeof(*bhdrp) + sizeof(*shbp) + sizeof(total_length) || (total_length > BT_SHB_INSANE_MAX)) { return -1; } return 0; } static ssize_t validate_pcap_file(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *file_name) { uint8_t magic[4]; char abs_path[PATH_MAX] = {0}; snprintf(abs_path, sizeof(abs_path), "%s/%s", pfile_dev_ctx->entity.dir->dir_name, file_name); FILE *fp = fopen(abs_path, "r"); if (nullptr == fp) { return -1; } size_t ret = fread(&magic, 1, sizeof(magic), fp); if (ret != sizeof(magic)) { return -1; } if (pcap_header_check(magic, fp) == 0) { return 0; } else if (pcapng_header_check(magic, fp) == 0) { return 0; } return -1; } static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_context *pfile_dev_ctx, struct timespec *deadline) { if (nullptr == pfile_dev_ctx) { return -1; } if (strlen(pfile_dev_ctx->entity.dir->dir_name) == 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory name."); return -1; } struct dirent *dir = nullptr; struct pending_file *file_to_add = nullptr; while ((dir = readdir(pfile_dev_ctx->entity.dir->directory)) != nullptr) { /* ignore non plain file */ if (dir->d_type != DT_REG) { continue; } /* ignore . and .. */ if (strcmp(dir->d_name, ".") == 0 || strcmp(dir->d_name, "..") == 0) { continue; } /* ignore non pcap file */ if (validate_pcap_file(pfile_dev_ctx, dir->d_name) < 0) { continue; } file_to_add = find_pending_file_to_add(pfile_dev_ctx, dir, deadline); if (nullptr == file_to_add) { continue; } if (pcap_directory_insert_file(file_to_add) < 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to insert file into directory"); FREE(file_to_add); return -1; } } return 0; } static ssize_t pcap_directory_files_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id) { ssize_t res = -1; struct timespec deadline; memset(&deadline, 0, sizeof(struct timespec)); get_current_timespec(&deadline); /* the newest file which can be processed */ deadline.tv_sec = deadline.tv_sec - pfile_dev_ctx->entity.dir->delay; /* collect pending files in current directory */ if (pcap_directory_collect_pending_files(pfile_dev_ctx, &deadline) < 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to collect pending files in directory."); return -1; } struct timespec last_time_seen; memset(&last_time_seen, 0, sizeof(struct timespec)); /* not open file yet */ if (nullptr == pfile_dev_ctx->entity.dir->current_file[rxq_id]) { pthread_mutex_lock(&g_pending_file_queue.queue_mutex); /* file_queue is empty */ if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) { pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); log_info("directory %s has no files to process", pfile_dev_ctx->entity.dir->dir_name); return 0; } struct pending_file *current_file = TAILQ_FIRST(&g_pending_file_queue.file_queue_head); TAILQ_REMOVE(&g_pending_file_queue.file_queue_head, current_file, next); pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); pfile_dev_ctx->entity.dir->pending_file[rxq_id] = current_file; log_info("processing file %s", current_file->file_name); struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); if (nullptr == pfile_info) { log_error(ST_ERR_MEM_ALLOC, "alloc pcap_plain_file_info failed."); return -1; } res = strncpy_safe(pfile_info->file_name, current_file->file_name, sizeof(pfile_info->file_name)); if (res < 0) { FREE(pfile_info); FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); log_error(ST_ERR_STR_COPY, "pfile_info file name copy failed."); return -1; } pfile_info->shared = &pfile_dev_ctx->shared; /* init_pcap_file is thread unsafe, because it calls pcap_compile(thread unsafe) */ pthread_mutex_lock(&pfile_dev_ctx->ctx_mutex); res = init_pcap_file(pfile_info); pthread_mutex_unlock(&pfile_dev_ctx->ctx_mutex); if (res < 0) { FREE(pfile_info); FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); return -1; } else { pfile_dev_ctx->entity.dir->current_file[rxq_id] = pfile_info; res = pcap_directory_file_pkts_dispatch(pfile_dev_ctx, rxq_id); if (res < 0) { FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); return -1; } log_info("processed file %s, processed up to %" PRIuMAX, current_file->file_name, (uintmax_t)timespec_to_millisecond(¤t_file->modified_time)); if (compare_timespec(¤t_file->modified_time, &last_time_seen) > 0) { copy_timespec(¤t_file->modified_time, &last_time_seen); } if (res == 0) { // reach the end of the file cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); } } } else { /* file has been opened */ res = pcap_directory_file_pkts_dispatch(pfile_dev_ctx, rxq_id); if (res < 0) { FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); return -1; } else if (res == 0) { cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); } } if (compare_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts) > 0) { log_info("updating processed to %" PRIuMAX, (uintmax_t)timespec_to_millisecond(&last_time_seen)); copy_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts); } return res; } ssize_t pio_pcap_file_device_receive(struct packet_io_device *pdev, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { struct pio_pcap_file_device_context *pfile_dev_ctx = pdev->entity.pcap_file_dev_ctx; if (nullptr == pfile_dev_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pcap_file_dev_ctx pointer."); return -1; } ssize_t res = -1; if (pfile_dev_ctx->is_dir == 0) { log_info("Start reading file:%s", pfile_dev_ctx->entity.file->file_name); /* read pcap file and dispatch pkts to pkt_queue */ pcap_file_pkts_dispatch(pfile_dev_ctx); /* get pkts from pkt_queue */ res = get_pcap_file_pkts(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } else { log_info("Start reading directory:%s", pfile_dev_ctx->entity.dir->dir_name); /* read directory pcap files and dispatch pkts to pkt_queue */ pcap_directory_files_dispatch(pfile_dev_ctx, rxq_id); /* get pkts from pkt_queue[rxq_id] */ res = get_pcap_file_pkts(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } return res; } void pio_pcap_file_device_pkt_free(__unused struct packet_io_device *pdev, __unused uint32_t qid, struct stellar_packet **pkts, size_t nr_pkts) { for (size_t i = 0; i < nr_pkts; i++) { struct pio_packet *p = (struct pio_packet *)pkts[i]; FREE(p); } } ssize_t pio_pcap_file_instance_create(struct packet_io_instance *pinst) { if (nullptr == pinst) { log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "invalid pcap file instance pointer."); return -1; } pinst->entity.pcap_file_inst_ctx = CALLOC(struct pio_pcap_file_instance_context, 1); if (nullptr == pinst->entity.pcap_file_inst_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "alloc pcap_file_inst_ctx failed."); return -1; } return 0; } void pio_pcap_file_instance_destroy(struct packet_io_instance *pinst) { if (nullptr == pinst) { return; } if (pinst->entity.pcap_file_inst_ctx != nullptr) { FREE(pinst->entity.pcap_file_inst_ctx); } struct packet_io_device *node = nullptr; while ((node = TAILQ_FIRST(&pinst->device_queue_head)) != nullptr) { TAILQ_REMOVE(&pinst->device_queue_head, node, next); pinst->dev_cnt--; pio_pcap_file_device_close(node); FREE(node); } } char *pio_pcap_file_device_buff_ctrlzone(struct stellar_packet *p, size_t *ctrlzone_len) { struct pio_packet *pkt = (struct pio_packet *)p; *ctrlzone_len = CUSTOM_ZONE_LEN; return (char *)pkt->pkt_hdr; } char *pio_pcap_file_device_buff_mtod(struct stellar_packet *p, size_t *data_len) { struct pio_packet *pkt = (struct pio_packet *)p; *data_len = pkt->pkt_len; return (char *)pkt->pkt_payload; }