/* ********************************************************************************************** * File: pcap_file.cpp * Description: pcap file runmode api * Authors: Liu WenTan * Date: 2022-07-15 * Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. *********************************************************************************************** */ #include #include #include #include #include #include #include #include "utils.h" #include "util_errors.h" #include "logger.h" #include "time_helper.h" #include "pio_pcap_file.h" #include "packet_io.h" #include "packet_io_util.h" #include "packet_io_internal.h" struct safe_pending_file_queue g_pending_file_queue; /** * @brief validate path is a valid plain file or directory * * @retval failed (-1) successful (0), * if success, dir == nullptr <---> means path is plain file * dir != nullptr <---> means path is directory */ static ssize_t validate_directory_or_file(const char *path, DIR **dir) { DIR *temp_dir = nullptr; ssize_t ret = -1; temp_dir = opendir(path); if (nullptr == temp_dir) { switch (errno) { case EACCES: log_error(ST_ERR_FOPEN, "%s: Permission denied", path); break; case EBADF: log_error(ST_ERR_FOPEN, "%s: invalid file descriptor", path); break; case ENOTDIR: log_info("%s: is a plain file, not directory", path); ret = 0; break; default: log_error(ST_ERR_FOPEN, "%s: errno:%d", path, errno); } } else { *dir = temp_dir; ret = 0; } return ret; } /** * @brief get the timestamp of the first packet and rewind * * @retval true(success), false(error) */ static bool peek_first_packet_timestamp(struct pcap_plain_file_info *pfile_info) { int ret = pcap_next_ex(pfile_info->pcap_handle, &pfile_info->first_pkt_hdr, &pfile_info->first_pkt_data); if (ret <= 0 || (nullptr == pfile_info->first_pkt_hdr)) { log_error(ST_ERR_PCAP_OPEN_OFFLINE, "failed to get first packet timestamp"); return false; } pfile_info->first_pkt_ts.tv_sec = pfile_info->first_pkt_hdr->ts.tv_sec; pfile_info->first_pkt_ts.tv_usec = pfile_info->first_pkt_hdr->ts.tv_usec; return true; } static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info) { char errbuf[PCAP_ERRBUF_SIZE] = ""; if (nullptr == pfile_info || nullptr == pfile_info->file_name) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pfile_info pointer or file_name"); return -1; } pfile_info->pcap_handle = pcap_open_offline(pfile_info->file_name, errbuf); if (nullptr == pfile_info->pcap_handle) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "open pcap file:%s failed.", pfile_info->file_name); return -1; } pthread_mutex_init(&pfile_info->handle_mutex, nullptr); #if 0 if (pfile_info->shared != nullptr && pfile_info->shared->bpf_string != nullptr) { if (pcap_compile(pfile_info->pcap_handle, &pfile_info->filter, pfile_info->shared->bpf_string, 1, 0) < 0) { log_error(ST_ERR_BPF, "bpf compilation error %s for %s", pcap_geterr(pfile_info->pcap_handle), pfile_info->file_name); return -1; } if (pcap_setfilter(pfile_info->pcap_handle, &pfile_info->filter) < 0) { log_error(ST_ERR_BPF, "could not set bpf filter %s for %s", pcap_geterr(pfile_info->pcap_handle), pfile_info->file_name); pcap_freecode(&pfile_info->filter); return -1; } pcap_freecode(&pfile_info->filter); } #endif pfile_info->data_link = pcap_datalink(pfile_info->pcap_handle); if (!peek_first_packet_timestamp(pfile_info)) { return -1; } return 0; } static ssize_t pcap_plain_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *file_name) { if (nullptr == pfile_dev_ctx) { return -1; } struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); if (nullptr == pfile_info) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_plain_file_info failed."); return -1; } /* TODO: get conf and assign pfile_info */ int ret = strncpy_safe(pfile_info->file_name, file_name, sizeof(pfile_info->file_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap plain file name copy failed."); return -1; } pfile_info->shared = &pfile_dev_ctx->shared; ret = init_pcap_file(pfile_info); if (ret < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); FREE(pfile_info); return -1; } else { pfile_dev_ctx->is_dir = 0; pfile_dev_ctx->entity.file = pfile_info; } return 0; } static ssize_t pcap_directory_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *dir_name, DIR *directory) { if (nullptr == pfile_dev_ctx) { return -1; } struct pcap_file_directory_info *pdir_info = CALLOC(struct pcap_file_directory_info, 1); if (nullptr == pdir_info) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_directory_info failed."); return -1; } /* TODO: get conf and assign pdir_info */ int ret = strncpy_safe(pdir_info->dir_name, dir_name, sizeof(pdir_info->dir_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap directory name copy failed."); return -1; } /* TODO: if should configurable */ pdir_info->delay = 30; pdir_info->shared = &pfile_dev_ctx->shared; pdir_info->directory = directory; pfile_dev_ctx->is_dir = 1; pfile_dev_ctx->entity.dir = pdir_info; /* init global pending file queue */ TAILQ_INIT(&g_pending_file_queue.file_queue_head); pthread_mutex_init(&g_pending_file_queue.queue_mutex, nullptr); return 0; } static ssize_t pcap_file_shared_init(struct pio_pcap_file_device_context *pfile_dev_ctx) { if (nullptr == pfile_dev_ctx) { return -1; } /* TODO: get conf and assign pfile_dev_ctx->shared */ if ((g_packet_io_config.common.mode == PACKET_IO_RUN_MODE_PCAP_FILE) && g_packet_io_config.pcap.bpf_string != nullptr) { memset(pfile_dev_ctx->shared.bpf_string, 0, sizeof(pfile_dev_ctx->shared.bpf_string)); ssize_t ret = strncpy_safe(pfile_dev_ctx->shared.bpf_string, g_packet_io_config.pcap.bpf_string, sizeof(pfile_dev_ctx->shared.bpf_string)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "pcap file bpf string copy failed."); return -1; } } pfile_dev_ctx->shared.should_delete = g_packet_io_config.pcap.should_delete; /* init pcap file device packet queue */ for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) { pio_packet_queue_init(&pfile_dev_ctx->pkt_queues[i]); } return 0; } static void cleanup_pcap_plain_file_info(struct pcap_plain_file_info *pfile_info) { if (pfile_info != nullptr) { if (pfile_info->pcap_handle != nullptr) { pcap_close(pfile_info->pcap_handle); pfile_info->pcap_handle = nullptr; } if (pfile_info->file_name != nullptr) { if (pfile_info->shared != nullptr && pfile_info->shared->should_delete) { log_debug("Deleting pcap file:%s", pfile_info->file_name); if (unlink(pfile_info->file_name) != 0) { log_notice(ST_ERR_PCAP_FILE_DELETE_FAILED, "Failed to delete %s", pfile_info->file_name); } } pfile_info->shared = nullptr; } } } static void cleanup_pcap_directory_info(struct pcap_file_directory_info *pdir_info) { } ssize_t pio_pcap_file_device_open(struct packet_io_device *pdev) { ssize_t status = -1; DIR *directory = nullptr; if (nullptr == pdev) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid packet_io_device pointer."); return -1; } pdev->entity.pcap_file_dev_ctx = CALLOC(struct pio_pcap_file_device_context, 1); if (nullptr == pdev->entity.pcap_file_dev_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_dev_ctx failed."); return -1; } pdev->entity.pcap_file_dev_ctx->pdev = pdev; status = pcap_file_shared_init(pdev->entity.pcap_file_dev_ctx); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap file shared init failed."); return -1; } if (validate_directory_or_file(pdev->dev_name, &directory) != 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid path:%s (not plain file or directory)", pdev->dev_name); return -1; } if (nullptr == directory) { /* plain file */ status = pcap_plain_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap plain file init failed."); return -1; } } else { /* directory */ status = pcap_directory_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name, directory); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap directory file init failed."); return -1; } } return 0; } ssize_t pio_pcap_file_device_close(struct packet_io_device *pdev) { if (nullptr == pdev) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pdev pointer so close pcap file device failed!"); return -1; } if (pdev->entity.pcap_file_dev_ctx != nullptr) { if (pdev->entity.pcap_file_dev_ctx->entity.file != nullptr) { cleanup_pcap_plain_file_info(pdev->entity.pcap_file_dev_ctx->entity.file); } if (pdev->entity.pcap_file_dev_ctx->entity.dir != nullptr) { cleanup_pcap_directory_info(pdev->entity.pcap_file_dev_ctx->entity.dir); } for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) { if (pdev->entity.pcap_file_dev_ctx->pkt_queues[i].len != 0) { release_pio_packet_queue(&pdev->entity.pcap_file_dev_ctx->pkt_queues[i]); } } } FREE(pdev->entity.pcap_file_dev_ctx); return 0; } void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_char *pkt) { struct pio_pcap_file_device_context *pfile_dev_ctx = (struct pio_pcap_file_device_context *)user; uint32_t p_len = 0; if (pkt_hdr->caplen < DEFAULT_MTU) { p_len = COMMON_SIZE_OF_PIO_PACKET; } else { p_len = MAX_SIZE_OF_PIO_PACKET; } struct pio_packet *p = (struct pio_packet *)malloc(p_len); if (nullptr == p) { return; } memset(p, 0, p_len); p->pkt_hdr = p; p->pkt_payload = (uint8_t *)p + CUSTOM_ZONE_LEN; p->pkt_len = pkt_hdr->caplen; p->data_link = pfile_dev_ctx->entity.file->data_link; if (packet_copy_data((uint8_t *)p->pkt_payload, (uint8_t *)pkt, pkt_hdr->caplen)) { FREE(p); return; } /* nr_rxq <= VIRTUAL_QUEUE_MAX_NUM */ uint16_t nr_rxq = pfile_dev_ctx->pdev->rxq_num; uint16_t rxq_id = pio_packet_hash(p) % nr_rxq; /* hash to specific queue id and enqueue */ pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); pio_packet_enqueue(&pfile_dev_ctx->pkt_queues[rxq_id], p); pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); } static ssize_t pcap_directory_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) { pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) { pcap_file_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr, (u_char *)pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data); pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr = nullptr; pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data = nullptr; } pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); } int packet_q_len = nr_pkts; ssize_t res = -1; pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); res = pcap_dispatch(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle, packet_q_len, (pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle), pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name); } else if (res == 0) { log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name, res); //TODO: close pcap file } else { // success struct pio_packet *p = nullptr; size_t i = 0; uint32_t q_len = 0; pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); /* if pkt queue is empty */ if (pfile_dev_ctx->pkt_queues[rxq_id].len == 0) { pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); return 0; } do { p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); pkts[i] = (struct stellar_packet *)p; q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; i++; } while ((q_len != 0) && (i < nr_pkts)); pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); if (q_len == 0) { res = i; } else { res = nr_pkts; } } return res; } static ssize_t pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { pcap_file_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.file->first_pkt_hdr, (u_char *)pfile_dev_ctx->entity.file->first_pkt_data); pfile_dev_ctx->entity.file->first_pkt_hdr = nullptr; pfile_dev_ctx->entity.file->first_pkt_data = nullptr; } pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); } int packet_q_len = nr_pkts; ssize_t res = -1; pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); res = pcap_dispatch(pfile_dev_ctx->entity.file->pcap_handle, packet_q_len, (pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.file->pcap_handle), pfile_dev_ctx->entity.file->file_name); } else { if (res == 0) { log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res); } // success struct pio_packet *p = nullptr; size_t i = 0; uint32_t q_len = 0; pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); if (pfile_dev_ctx->pkt_queues[rxq_id].len != 0) { do { p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; pkts[i] = (struct stellar_packet *)p; i++; } while ((q_len != 0) && (i < nr_pkts)); } pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); if (q_len == 0) { res = i; } else { res = nr_pkts; } } return res; } static ssize_t pcap_directory_get_modified_time(char *pfile, struct timespec *out) { struct stat buf; ssize_t ret = -1; if (nullptr == pfile) { return ret; } if ((ret = stat(pfile, &buf)) != 0) { return ret; } out->tv_sec = buf.st_mtim.tv_sec; out->tv_nsec = buf.st_mtim.tv_nsec; return ret; } struct pending_file * find_pending_file_to_add(struct pio_pcap_file_device_context *pfile_dev_ctx, struct dirent *dir, struct timespec *deadline) { char abs_path[PATH_MAX] = {0}; snprintf(abs_path, sizeof(abs_path), "%s/%s", pfile_dev_ctx->entity.dir->dir_name, dir->d_name); struct timespec temp_time; memset(&temp_time, 0, sizeof(struct timespec)); if (pcap_directory_get_modified_time(abs_path, &temp_time) < 0) { log_debug("unable to get modified time on %s, skipping", abs_path); return nullptr; } /* skip files outside of out time range */ if (compare_timespec(&temp_time, &pfile_dev_ctx->shared.last_processed_ts) <= 0) { log_debug("skipping old file %s", abs_path); return nullptr; } else if (compare_timespec(&temp_time, deadline) >= 0) { log_debug("skipping new file %s", abs_path); return nullptr; } struct pending_file *file_to_add = CALLOC(struct pending_file, 1); ssize_t ret = strncpy_safe(file_to_add->file_name, abs_path, sizeof(file_to_add->file_name)); if (ret < 0) { log_error(ST_ERR_STR_COPY, "file_to_add file name copy failed."); return nullptr; } copy_timespec(&temp_time, &file_to_add->modified_time); log_info("found \"%s\" at %" PRIuMAX, file_to_add->file_name, (uintmax_t)timespec_to_millisecond(&file_to_add->modified_time)); return file_to_add; } static ssize_t pcap_directory_insert_file(struct pending_file *file_to_add) { if (nullptr == file_to_add) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory or file parameters."); return -1; } log_debug("inserting %s into pending file queue", file_to_add->file_name); pthread_mutex_lock(&g_pending_file_queue.queue_mutex); if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) { TAILQ_INSERT_TAIL(&g_pending_file_queue.file_queue_head, file_to_add, next); } else { /* pending file queue is not empty */ struct pending_file *file_to_compare = TAILQ_FIRST(&g_pending_file_queue.file_queue_head); while (file_to_compare != nullptr) { if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) < 0) { TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next); file_to_compare = nullptr; } else if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) > 0) { struct pending_file *next_file_to_compare = TAILQ_NEXT(file_to_compare, next); if (next_file_to_compare == nullptr) { TAILQ_INSERT_AFTER(&g_pending_file_queue.file_queue_head, file_to_compare, file_to_add, next); } file_to_compare = next_file_to_compare; } else { /* find same file, ignore it */ printf("find same file\n"); break; } } } pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); return 0; } static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_context *pfile_dev_ctx, struct timespec *deadline) { if (nullptr == pfile_dev_ctx) { return -1; } if (strlen(pfile_dev_ctx->entity.dir->dir_name) == 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory name."); return -1; } struct dirent *dir = nullptr; struct pending_file *file_to_add = nullptr; while ((dir = readdir(pfile_dev_ctx->entity.dir->directory)) != nullptr) { /* ignore non plain file */ if (dir->d_type != DT_REG) { continue; } /* ignore . and .. */ if (strcmp(dir->d_name, ".") == 0 || strcmp(dir->d_name, "..") == 0) { continue; } file_to_add = find_pending_file_to_add(pfile_dev_ctx, dir, deadline); if (nullptr == file_to_add) { continue; } if (pcap_directory_insert_file(file_to_add) < 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to insert file into directory"); FREE(file_to_add); return -1; } } return 0; } static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { ssize_t res = -1; struct timespec deadline; memset(&deadline, 0, sizeof(struct timespec)); get_current_timespec(&deadline); /* the newest file which can be processed */ deadline.tv_sec = deadline.tv_sec - pfile_dev_ctx->entity.dir->delay; /* collect pending files in current directory */ if (pcap_directory_collect_pending_files(pfile_dev_ctx, &deadline) < 0) { log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to collect pending files in directory."); return -1; } struct timespec last_time_seen; memset(&last_time_seen, 0, sizeof(struct timespec)); /* not open file yet */ if (nullptr == pfile_dev_ctx->entity.dir->current_file[rxq_id]) { pthread_mutex_lock(&g_pending_file_queue.queue_mutex); /* file_queue is empty */ if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) { pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); log_info("directory %s has no files to process", pfile_dev_ctx->entity.dir->dir_name); return 0; } struct pending_file *current_file = TAILQ_FIRST(&g_pending_file_queue.file_queue_head); TAILQ_REMOVE(&g_pending_file_queue.file_queue_head, current_file, next); pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); pfile_dev_ctx->entity.dir->pending_file[rxq_id] = current_file; log_info("processing file %s", current_file->file_name); struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); if (nullptr == pfile_info) { log_error(ST_ERR_MEM_ALLOC, "alloc pcap_plain_file_info failed."); return -1; } res = strncpy_safe(pfile_info->file_name, current_file->file_name, sizeof(pfile_info->file_name)); if (res < 0) { log_error(ST_ERR_STR_COPY, "pfile_info file name copy failed."); FREE(current_file); FREE(pfile_info); return -1; } pfile_info->shared = &pfile_dev_ctx->shared; if (init_pcap_file(pfile_info) < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); FREE(current_file); FREE(pfile_info); return -1; } else { pfile_dev_ctx->entity.dir->current_file[rxq_id] = pfile_info; res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); if (res < 0) { FREE(current_file); return -1; } log_info("processed file %s, processed up to %" PRIuMAX, current_file->file_name, (uintmax_t)timespec_to_millisecond(¤t_file->modified_time)); if (compare_timespec(¤t_file->modified_time, &last_time_seen) > 0) { copy_timespec(¤t_file->modified_time, &last_time_seen); } if (res == 0) { // reach the end of the file cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(current_file); } } } else { /* file has been opened */ res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); if (res < 0) { return -1; } else if (res == 0) { cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); } } if (compare_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts) > 0) { log_info("updating processed to %" PRIuMAX, (uintmax_t)timespec_to_millisecond(&last_time_seen)); copy_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts); } return res; } ssize_t pio_pcap_file_device_receive(struct packet_io_device *pdev, uint32_t rxq_id, struct stellar_packet **pkts, size_t nr_pkts) { struct pio_pcap_file_device_context *pfile_dev_ctx = pdev->entity.pcap_file_dev_ctx; if (nullptr == pfile_dev_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pcap_file_dev_ctx pointer."); return -1; } ssize_t res = -1; if (pfile_dev_ctx->is_dir == 0) { log_info("Start reading file:%s", pfile_dev_ctx->entity.file->file_name); res = pcap_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } else { log_info("Start reading directory:%s", pfile_dev_ctx->entity.dir->dir_name); res = pcap_directory_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } return res; } void pio_pcap_file_device_pkt_free(__unused struct packet_io_device *pdev, __unused uint32_t qid, struct stellar_packet **pkts, size_t nr_pkts) { for (size_t i = 0; i < nr_pkts; i++) { struct pio_packet *p = (struct pio_packet *)pkts[i]; FREE(p); } } ssize_t pio_pcap_file_instance_create(struct packet_io_instance *pinst) { if (nullptr == pinst) { log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "invalid pcap file instance pointer."); return -1; } pinst->entity.pcap_file_inst_ctx = CALLOC(struct pio_pcap_file_instance_context, 1); if (nullptr == pinst->entity.pcap_file_inst_ctx) { log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "alloc pcap_file_inst_ctx failed."); return -1; } return 0; } void pio_pcap_file_instance_destroy(struct packet_io_instance *pinst) { if (nullptr == pinst) { return; } if (pinst->entity.pcap_file_inst_ctx != nullptr) { FREE(pinst->entity.pcap_file_inst_ctx); } struct packet_io_device *node = nullptr; while ((node = TAILQ_FIRST(&pinst->device_queue_head)) != nullptr) { TAILQ_REMOVE(&pinst->device_queue_head, node, next); pinst->dev_cnt--; pio_pcap_file_device_close(node); FREE(node); } } char *pio_pcap_file_device_buff_ctrlzone(struct stellar_packet *p, size_t *ctrlzone_len) { struct pio_packet *pkt = (struct pio_packet *)p; *ctrlzone_len = CUSTOM_ZONE_LEN; return (char *)pkt->pkt_hdr; } char *pio_pcap_file_device_buff_mtod(struct stellar_packet *p, size_t *data_len) { struct pio_packet *pkt = (struct pio_packet *)p; *data_len = pkt->pkt_len; return (char *)pkt->pkt_payload; }