From 9cf895fa074a396c59b111558274cc00eccd3c15 Mon Sep 17 00:00:00 2001 From: liuwentan Date: Fri, 5 Aug 2022 11:03:11 +0800 Subject: [PATCH] [PACKET_IO] finish pcap_file mode --- sdk/include/packet.h | 7 +- sdk/include/util_errors.h | 5 + src/CMakeLists.txt | 2 +- src/common/global_var.h | 15 + src/common/packet_queue.cpp | 74 +++ src/common/packet_queue.h | 24 + src/common/time_helper.cpp | 50 ++ src/common/time_helper.h | 22 + src/main.cpp | 1 - src/packet_io/CMakeLists.txt | 2 + src/packet_io/marsio_mode/pio_marsio.cpp | 14 +- src/packet_io/marsio_mode/pio_marsio.h | 10 +- src/packet_io/packet_io.cpp | 4 +- src/packet_io/packet_io.h | 8 +- .../pcap_file_mode/pio_pcap_file.cpp | 534 +++++++++++++++++- src/packet_io/pcap_file_mode/pio_pcap_file.h | 36 +- .../pcap_live_mode/pio_pcap_live.cpp | 2 +- src/packet_io/pcap_live_mode/pio_pcap_live.h | 13 +- src/packet_io/test/CMakeLists.txt | 1 + 19 files changed, 773 insertions(+), 51 deletions(-) create mode 100644 src/common/packet_queue.cpp create mode 100644 src/common/packet_queue.h create mode 100644 src/common/time_helper.cpp create mode 100644 src/common/time_helper.h diff --git a/sdk/include/packet.h b/sdk/include/packet.h index 7adfffd..ba6493e 100644 --- a/sdk/include/packet.h +++ b/sdk/include/packet.h @@ -1,9 +1,12 @@ #pragma once +#include #include "marsio.h" struct packet { - /* queue id which the packet belongs to */ - int qid; + void *raw_pkt; + uint32_t pkt_len; + struct packet *prev; + struct packet *next; }; diff --git a/sdk/include/util_errors.h b/sdk/include/util_errors.h index b95a7e3..1321821 100644 --- a/sdk/include/util_errors.h +++ b/sdk/include/util_errors.h @@ -23,6 +23,11 @@ typedef enum { ST_ERR_PIO_MARSIO_DEVICE, ST_ERR_PIO_PCAP_FILE_DEVICE, ST_ERR_PIO_PCAP_LIVE_DEVICE, + ST_ERR_PCAP_OPEN_OFFLINE, + ST_ERR_PCAP_FILE_DELETE_FAILED, + ST_ERR_PCAP_DISPATCH, + ST_ERR_PCAP_FILE_COLLECT_FAILED, ST_ERR_FOPEN, + ST_ERR_BPF, ST_ERR_MAX } error_code_t; \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a429c3c..3942b9b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,4 +17,4 @@ add_executable(stellar main.cpp ) -target_link_libraries(stellar packet_io plugin_manager session_manager http dl) \ No newline at end of file +target_link_libraries(stellar packet_io plugin_manager session_manager http dl pcap) \ No newline at end of file diff --git a/src/common/global_var.h b/src/common/global_var.h index 05f8ab6..73ae118 100644 --- a/src/common/global_var.h +++ b/src/common/global_var.h @@ -12,8 +12,10 @@ #include #include +#include #define DEV_MAX_CNT 64 +#define STR_MAX_LEN 1024 enum packet_io_run_mode { PACKET_IO_RUN_MODE_PCAP_FILE, @@ -42,6 +44,19 @@ struct packet_io_config { /* device counts */ uint32_t dev_cnt; + + /* bpf filter string, such as "tcp and port 25"*/ + char bpf_string[STR_MAX_LEN]; + + /* delete after the pcap file is read */ + bool should_delete; + + /* loop read pcap file in directory */ + bool should_loop; + + time_t delay; + + time_t poll_interval; }; struct lib_config { diff --git a/src/common/packet_queue.cpp b/src/common/packet_queue.cpp new file mode 100644 index 0000000..70974a7 --- /dev/null +++ b/src/common/packet_queue.cpp @@ -0,0 +1,74 @@ +/* +********************************************************************************************** +* File: packet_queue.cpp +* Description: +* Authors: Liu WenTan +* Date: 2022-07-15 +* Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. +*********************************************************************************************** +*/ + +#include "../../sdk/include/utils.h" +#include "packet_queue.h" + +void packet_enqueue(struct packet_queue *q, struct packet *p) +{ + if (nullptr == p) + return; + + /* more packets in queue */ + if (q->top != nullptr) { + p->prev = nullptr; + p->next = q->top; + q->top->prev = p; + q->top = p; + /* only packet */ + } else { + p->prev = nullptr; + p->next = nullptr; + q->top = p; + q->bot = p; + } + q->len++; +} + +struct packet *packet_dequeue(struct packet_queue *q) +{ + struct packet *p = NULL; + + /* if the queue is empty there are no packets left. */ + if (q->len == 0) { + return nullptr; + } + + q->len--; + + /* pull the bottom packet from the queue */ + p = q->bot; + + /* more packets in queue */ + if (q->bot->prev != nullptr) { + q->bot = q->bot->prev; + q->bot->next = nullptr; + /* just the one we remove, so now empty */ + } else { + q->top = nullptr; + q->bot = nullptr; + } + + p->next = nullptr; + p->prev = nullptr; + return p; +} + +void release_packet_queue(struct packet_queue *q) +{ + if (nullptr == q) { + return; + } + + while (q->len != 0) { + struct packet *p = packet_dequeue(q); + FREE(p); + } +} \ No newline at end of file diff --git a/src/common/packet_queue.h b/src/common/packet_queue.h new file mode 100644 index 0000000..69ad9e8 --- /dev/null +++ b/src/common/packet_queue.h @@ -0,0 +1,24 @@ +/* +********************************************************************************************** +* File: packet_queue.h +* Description: packet queue structure and api +* Authors: Liu WenTan +* Date: 2022-07-15 +* Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. +*********************************************************************************************** +*/ + +#pragma once + +#include "../../sdk/include/packet.h" + +struct packet_queue { + struct packet *top; + struct packet *bot; + uint32_t len; + pthread_mutex_t mutex_q; +}; + +void packet_enqueue(struct packet_queue *, struct packet *); +struct packet *packet_dequeue(struct packet_queue *); +void release_packet_queue(struct packet_queue *); \ No newline at end of file diff --git a/src/common/time_helper.cpp b/src/common/time_helper.cpp new file mode 100644 index 0000000..2545ba7 --- /dev/null +++ b/src/common/time_helper.cpp @@ -0,0 +1,50 @@ +/* +********************************************************************************************** +* File: time_help.cpp +* Description: +* Authors: Liu WenTan +* Date: 2022-07-15 +* Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. +*********************************************************************************************** +*/ + +#include + +#include "time_helper.h" + +void get_current_timespec(struct timespec *tm) +{ + struct timeval now; + if(gettimeofday(&now, NULL) == 0) { + tm->tv_sec = now.tv_sec; + tm->tv_nsec = now.tv_usec * 1000L; + } +} + +int compare_timespec(struct timespec *left, struct timespec *right) +{ + if (left->tv_sec < right->tv_sec) { + return -1; + } else if (left->tv_sec > right->tv_sec) { + return 1; + } else { + if (left->tv_nsec < right->tv_nsec) { + return -1; + } else if (left->tv_nsec > right->tv_nsec) { + return 1; + } else { + return 0; + } + } +} + +void copy_timespec(struct timespec *from, struct timespec *to) +{ + to->tv_sec = from->tv_sec; + to->tv_nsec = from->tv_nsec; +} + +uint64_t timespec_to_millisecond(const struct timespec* ts) +{ + return ts->tv_sec * 1000L + ts->tv_nsec / 1000000L; +} \ No newline at end of file diff --git a/src/common/time_helper.h b/src/common/time_helper.h new file mode 100644 index 0000000..c20b00d --- /dev/null +++ b/src/common/time_helper.h @@ -0,0 +1,22 @@ +/* +********************************************************************************************** +* File: time_help.h +* Description: api about time +* Authors: Liu WenTan +* Date: 2022-07-15 +* Copyright: (c) 2018-2022 Geedge Networks, Inc. All rights reserved. +*********************************************************************************************** +*/ + +#pragma once + +#include +#include + +void get_current_timespec(struct timespec *tm); + +int compare_timespec(struct timespec *left, struct timespec *right); + +void copy_timespec(struct timespec *from, struct timespec *to); + +uint64_t timespec_to_millisecond(const struct timespec* ts); \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index abdec48..8b8501d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,7 +7,6 @@ *********************************************************************************************** */ -#include "./common/common.h" #include "./common/global_var.h" #include "./packet_io/packet_io.h" #include "./session_manager/session_manager.h" diff --git a/src/packet_io/CMakeLists.txt b/src/packet_io/CMakeLists.txt index 8cca713..0e45228 100644 --- a/src/packet_io/CMakeLists.txt +++ b/src/packet_io/CMakeLists.txt @@ -1,6 +1,8 @@ add_library(packet_io ../common/global_var.cpp + ../common/packet_queue.cpp + ../common/time_helper.cpp packet_io.cpp pcap_live_mode/pio_pcap_live.cpp pcap_file_mode/pio_pcap_file.cpp diff --git a/src/packet_io/marsio_mode/pio_marsio.cpp b/src/packet_io/marsio_mode/pio_marsio.cpp index 0359cdd..f0159a5 100644 --- a/src/packet_io/marsio_mode/pio_marsio.cpp +++ b/src/packet_io/marsio_mode/pio_marsio.cpp @@ -14,7 +14,6 @@ #include "pio_marsio.h" #include "../packet_io.h" #include "../../common/global_var.h" -#include "../../common/common.h" #include "../../sdk/include/logger.h" #include "../../sdk/include/utils.h" #include "../../sdk/include/util_errors.h" @@ -265,7 +264,7 @@ static int pio_get_marsio_dll_function_entries(void) return 0; } -int pio_marsio_device_open(struct packet_io_device *pdev, const char *dev_name, uint32_t nr_rxq, uint32_t nr_txq) +int pio_marsio_device_open(struct packet_io_device *pdev) { if (nullptr == pdev) { log_error(ST_ERR_PIO_MARSIO_DEVICE, "invalid packet_io_device pointer."); @@ -286,9 +285,9 @@ int pio_marsio_device_open(struct packet_io_device *pdev, const char *dev_name, struct mr_instance *mr_inst_handle = pdev->ppio_inst->entity.marsio_inst_ctx->mr_inst_handle; /* marsio_open_device() return marsio device handle*/ pdev->entity.marsio_dev_ctx->mr_dev_handle = \ - g_marsio_dll_func.marsio_open_device(mr_inst_handle, dev_name, nr_rxq, nr_txq); + g_marsio_dll_func.marsio_open_device(mr_inst_handle, pdev->dev_name, pdev->rxq_num, pdev->txq_num); if (nullptr == pdev->entity.marsio_dev_ctx->mr_dev_handle) { - log_error(ST_ERR_PIO_MARSIO_DEVICE, "marsio_open_device:%s error\n", dev_name); + log_error(ST_ERR_PIO_MARSIO_DEVICE, "marsio_open_device:%s error\n", pdev->dev_name); return -1; } @@ -395,9 +394,14 @@ static int marsio_instance_init(struct packet_io_instance *pinst, int wrk_thread */ int pio_marsio_instance_create(struct packet_io_instance *pinst, int wrk_thread_num) { + if (nullptr == pinst) { + log_error(ST_ERR_PIO_MARSIO_INSTANCE, "invalid marsio instance pointer."); + return -1; + } + pinst->entity.marsio_inst_ctx = CALLOC(struct pio_marsio_instance_context, 1); if (nullptr == pinst->entity.marsio_inst_ctx) { - log_error(ST_ERR_PIO_MARSIO_INSTANCE, "alloc marsio instance context failed."); + log_error(ST_ERR_PIO_MARSIO_INSTANCE, "alloc marsio_inst_ctx failed."); return -1; } diff --git a/src/packet_io/marsio_mode/pio_marsio.h b/src/packet_io/marsio_mode/pio_marsio.h index 0884afc..9e77354 100644 --- a/src/packet_io/marsio_mode/pio_marsio.h +++ b/src/packet_io/marsio_mode/pio_marsio.h @@ -83,7 +83,7 @@ struct pio_marsio_instance_context { }; /* - * struct pio_marsio_device - marsio device abstract + * struct pio_marsio_device_context - marsio device context * @mr_dev_handle: marsio device handle * if marsio device receive packets, use mr_dev_handle * @mr_sendpath_handle: marsio sendpath handle @@ -98,11 +98,11 @@ struct pio_marsio_device_context { * @brief open marsio device * * @param pdev: the marsio device's pointer - * @param dev_name: device name, such as eth1, eth2 ... - * @param nr_rxq: number of the packet receiving queues for the device - * @param nr_txq: number of the packet sending queues for the device +* pdev->dev_name: the name of marsio device, such as eth1, eth2, ... + * pdev->rxq_num: number of the packet receiving queues for the device + * pdev->txq_num: number of the packet sending queues for the device **/ -int pio_marsio_device_open(struct packet_io_device *pdev, const char *dev_name, uint32_t nr_rxq, uint32_t nr_txq); +int pio_marsio_device_open(struct packet_io_device *pdev); /* * @brief close pcap_live device diff --git a/src/packet_io/packet_io.cpp b/src/packet_io/packet_io.cpp index ae2c99e..9df4983 100644 --- a/src/packet_io/packet_io.cpp +++ b/src/packet_io/packet_io.cpp @@ -101,6 +101,8 @@ packet_io_device_open(struct packet_io_instance *pinst, const char *dev_name, ui } strncpy(ppio_dev->dev_name, dev_name, strlen(dev_name)); + ppio_dev->rxq_num = nr_rxq; + ppio_dev->txq_num = nr_txq; ppio_dev->ppio_inst = pinst; ppio_dev->dev_ops = &pio_device_ops_array[pinst->mode]; @@ -110,7 +112,7 @@ packet_io_device_open(struct packet_io_instance *pinst, const char *dev_name, ui **/ pinst->devices[pinst->dev_cnt++] = ppio_dev; - int ret = ppio_dev->dev_ops->open(ppio_dev, dev_name, nr_rxq, nr_txq); + int ret = ppio_dev->dev_ops->open(ppio_dev); if (ret < 0) { log_error(ST_ERR_PIO_DEVICE, "packet_io device open failed."); FREE(ppio_dev); diff --git a/src/packet_io/packet_io.h b/src/packet_io/packet_io.h index c9b71a9..f96160b 100644 --- a/src/packet_io/packet_io.h +++ b/src/packet_io/packet_io.h @@ -56,7 +56,7 @@ struct packet_io_instance { }; struct pio_device_operations { - int (*open)(struct packet_io_device *pinst, const char *dev_name, uint32_t nr_rxq, uint32_t nr_txq); + int (*open)(struct packet_io_device *pdev); int (*close)(struct packet_io_device *pdev); @@ -74,6 +74,12 @@ struct packet_io_device { /* device operations */ struct pio_device_operations *dev_ops; + /* number of receive queue */ + uint32_t rxq_num; + + /* number of send queue */ + uint32_t txq_num; + /* packet io device context */ union { struct pio_pcap_file_device_context *pcap_file_dev_ctx; diff --git a/src/packet_io/pcap_file_mode/pio_pcap_file.cpp b/src/packet_io/pcap_file_mode/pio_pcap_file.cpp index 661b68a..f4a5d68 100644 --- a/src/packet_io/pcap_file_mode/pio_pcap_file.cpp +++ b/src/packet_io/pcap_file_mode/pio_pcap_file.cpp @@ -10,12 +10,23 @@ #include #include +#include +#include +#include +#include #include "pio_pcap_file.h" #include "../packet_io.h" #include "../../sdk/include/utils.h" #include "../../sdk/include/util_errors.h" #include "../../sdk/include/logger.h" +#include "../../common/time_helper.h" + +#define SET_PKT_LEN(p, len) do { \ + (p)->pkt_len = (len); \ + } while(0) + +#define GET_PKT_DIRECT_DATA(p) (uint8_t *)((p) + 1) /* * @brief validate path is a valid plain file or directory @@ -24,7 +35,8 @@ * if success, dir == nullptr <---> means path is plain file * dir != nullptr <---> means path is directory **/ -static int validate_directory_or_file(const char *path, DIR **dir) { +static int validate_directory_or_file(const char *path, DIR **dir) +{ DIR *temp_dir = nullptr; int ret = -1; @@ -52,53 +64,211 @@ static int validate_directory_or_file(const char *path, DIR **dir) { return ret; } -static int pcap_plain_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx) { - struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); - if (nullptr == pfile_info) { - log_error(ST_ERR_PIO_PCAP_FILE_DEVICE); +/* + * @brief get the timestamp of the first packet and rewind + * + * @retval true(success), false(error) +*/ +static bool peek_first_packet_timestamp(struct pcap_plain_file_info *pfile_info) +{ + int ret = pcap_next_ex(pfile_info->pcap_handle, &pfile_info->first_pkt_hdr, + &pfile_info->first_pkt_data); + if (ret <= 0 || (nullptr == pfile_info->first_pkt_hdr)) { + log_error(ST_ERR_PCAP_OPEN_OFFLINE, "failed to get first packet timestamp"); + return false; + } + + pfile_info->first_pkt_ts.tv_sec = pfile_info->first_pkt_hdr->ts.tv_sec; + pfile_info->first_pkt_ts.tv_usec = pfile_info->first_pkt_hdr->ts.tv_usec; + + return true; +} + +static int init_pcap_file(struct pcap_plain_file_info *pfile_info) +{ + char errbuf[PCAP_ERRBUF_SIZE] = ""; + + if (nullptr == pfile_info || nullptr == pfile_info->file_name) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pfile_info pointer or file_name"); + return -1; + } + + pfile_info->pcap_handle = pcap_open_offline(pfile_info->file_name, errbuf); + if (nullptr == pfile_info->pcap_handle) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "open pcap file:%s failed.", pfile_info->file_name); + return -1; + } + + pthread_mutex_init(&pfile_info->handle_mutex, NULL); + + if (pfile_info->shared != nullptr && pfile_info->shared->bpf_string != nullptr) { + if (pcap_compile(pfile_info->pcap_handle, &pfile_info->filter, + pfile_info->shared->bpf_string, 1, 0) < 0) { + log_error(ST_ERR_BPF, "bpf compilation error %s for %s", + pcap_geterr(pfile_info->pcap_handle), pfile_info->file_name); + return -1; + } + + if (pcap_setfilter(pfile_info->pcap_handle, &pfile_info->filter) < 0) { + log_error(ST_ERR_BPF, "could not set bpf filter %s for %s", + pcap_geterr(pfile_info->pcap_handle, pfile_info->file_name)); + pcap_freecode(&pfile_info->filter); + return -1; + } + pcap_freecode(&pfile_info->filter); + } + + pfile_info->data_link = pcap_datalink(pfile_info->pcap_handle); + if (!peek_first_packet_timestamp(pfile_info)) { return -1; } return 0; } -static int pcap_directory_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx) { +static int pcap_plain_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *file_name) +{ + if (nullptr == pfile_dev_ctx) { + return -1; + } + + struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); + if (nullptr == pfile_info) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_plain_file_info failed."); + return -1; + } + + /* TODO: get conf and assign pfile_info */ + strncpy(pfile_info->file_name, file_name, strlen(file_name)); + pfile_info->shared = &pfile_dev_ctx->shared; + int ret = init_pcap_file(pfile_info); + if (ret < 0) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); + FREE(pfile_info); + return -1; + } else { + pfile_dev_ctx->is_dir = 0; + pfile_dev_ctx->entity.file = pfile_info; + } + + return 0; +} + +static int pcap_directory_file_init(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *dir_name, DIR *directory) +{ + if (nullptr == pfile_dev_ctx) { + return -1; + } + + struct pcap_file_directory_info *pdir_info = CALLOC(struct pcap_file_directory_info, 1); + if (nullptr == pdir_info) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_directory_info failed."); + return -1; + } + + /* TODO: get conf and assign pdir_info */ + strncpy(pdir_info->dir_name, dir_name, strlen(dir_name)); + //pdir_info->should_loop = false; + //pdir_info->delay = 30; + //pdir_info->poll_interval = 5; + pdir_info->shared = &pfile_dev_ctx->shared; + pdir_info->directory = directory; + TAILQ_INIT(&pdir_info->file_queue_head); + + pfile_dev_ctx->is_dir = 1; + pfile_dev_ctx->entity.dir = pdir_info; return 0; } -static int pcap_file_shared_init(struct pio_pcap_file_device_context *pfile_dev_ctx) { +static int pcap_file_shared_init(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t nr_rxq) +{ + if (nullptr == pfile_dev_ctx) { + return -1; + } + + /* TODO: get conf and assign pfile_dev_ctx->shared */ + if ((g_engine_instance.config.packet_io.mode == PACKET_IO_RUN_MODE_PCAP_FILE) && + g_engine_instance.config.packet_io.bpf_string != nullptr) { + strncpy(pfile_dev_ctx->shared.bpf_string, g_engine_instance.config.packet_io.bpf_string, + strlen(g_engine_instance.config.packet_io.bpf_string)); + } + + pfile_dev_ctx->shared.should_delete = g_engine_instance.config.packet_io.should_delete; + + /* init pcap file device packet queue */ + memset(pfile_dev_ctx->pkt_queues, 0, sizeof(pfile_dev_ctx->pkt_queues)); + + for (uint32_t i = 0; i < PKT_QUEUE_MAX_NUM; i++) { + pthread_mutex_init(&pfile_dev_ctx->pkt_queues[i].mutex_q, nullptr); + } return 0; } -int pio_pcap_file_device_open(struct packet_io_device *pdev, const char *path, uint32_t nr_rxq, uint32_t nr_txq) +static void cleanup_pcap_plain_file_info(struct pcap_plain_file_info *pfile_info) +{ + if (pfile_info != nullptr) { + if (pfile_info->pcap_handle != nullptr) { + pcap_close(pfile_info->pcap_handle); + pfile_info->pcap_handle = nullptr; + } + + if (pfile_info->file_name != nullptr) { + if (pfile_info->shared != nullptr && pfile_info->shared->should_delete) { + log_debug("Deleting pcap file:%s", pfile_info->file_name); + if (unlink(pfile_info->file_name) != 0) { + log_notice(ST_ERR_PCAP_FILE_DELETE_FAILED, "Failed to delete %s", + pfile_info->file_name); + } + } + pfile_info->shared = nullptr; + } + } +} + +static void cleanup_pcap_directory_info(struct pcap_file_directory_info *pdir_info) +{ + +} + +int pio_pcap_file_device_open(struct packet_io_device *pdev) { int status = -1; DIR *directory = nullptr; - - status = pcap_file_shared_init(pdev->entity.pcap_file_dev_ctx); + if (nullptr == pdev) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid packet_io_device pointer."); + return -1; + } + + pdev->entity.pcap_file_dev_ctx = CALLOC(struct pio_pcap_file_device_context, 1); + if (nullptr == pdev->entity.pcap_file_dev_ctx) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "alloc pcap_file_dev_ctx failed."); + return -1; + } + + status = pcap_file_shared_init(pdev->entity.pcap_file_dev_ctx, pdev->rxq_num); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap file shared init failed."); return -1; } - if (validate_directory_or_file(path, &directory) != 0) { - log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid path:%s (not plain file or directory)", path); + if (validate_directory_or_file(pdev->dev_name, &directory) != 0) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid path:%s (not plain file or directory)", pdev->dev_name); return -1; } if (nullptr == directory) { /* plain file */ - status = pcap_plain_file_init(pdev->entity.pcap_file_dev_ctx); + status = pcap_plain_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap plain file init failed."); return -1; } } else { /* directory */ - status = pcap_directory_file_init(pdev->entity.pcap_file_dev_ctx); + status = pcap_directory_file_init(pdev->entity.pcap_file_dev_ctx, pdev->dev_name, directory); if (status < 0) { log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "pcap directory file init failed."); return -1; @@ -114,17 +284,335 @@ int pio_pcap_file_device_close(struct packet_io_device *pdev) log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pdev pointer so close pcap file device failed!"); return -1; } + + if (pdev->entity.pcap_file_dev_ctx->entity.file != nullptr) { + cleanup_pcap_plain_file_info(pdev->entity.pcap_file_dev_ctx->entity.file); + } + + if (pdev->entity.pcap_file_dev_ctx->entity.dir != nullptr) { + cleanup_pcap_directory_info(pdev->entity.pcap_file_dev_ctx->entity.dir); + } + + for (uint32_t i = 0; i < PKT_QUEUE_MAX_NUM; i++) { + if (pdev->entity.pcap_file_dev_ctx->pkt_queues[i].len != 0) { + release_packet_queue(&pdev->entity.pcap_file_dev_ctx->pkt_queues[i]); + } + } + + FREE(pdev->entity.pcap_file_dev_ctx); + + return 0; +} + +int packet_copy_data_offset(struct packet *p, uint32_t offset, const uint8_t *data, uint32_t data_len) { + memcpy(GET_PKT_DIRECT_DATA(p) + offset, data, data_len); + + return 0; +} + +int packet_copy_data(struct packet *p, const uint8_t *pkt_data, uint32_t pkt_len) { + SET_PKT_LEN(p, (size_t)pkt_len); + return packet_copy_data_offset(p, 0, pkt_data, pkt_len); +} + +static void pcap_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_char *pkt, uint32_t nr_rxq, uint32_t rxq_id) +{ + struct pio_pcap_file_device_context *pfile_dev_ctx = (struct pio_pcap_file_device_context *)user; + struct packet *p = CALLOC(struct packet, 1); + if (nullptr == p) { + return; + } + + if (packet_copy_data(p, pkt, pkt_hdr->caplen)) { + FREE(p); + return; + } + + /* + hash to specific queue id and enqueue + hash_id = decode_packet(p) % nr_rxq; + packet_enqueue(&pfile_dev_ctx->pkt_queues[hash_id], p); + */ + pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); + packet_enqueue(&pfile_dev_ctx->pkt_queues[rxq_id], p); + pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); +} + +static int pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t nr_rxq, uint32_t rxq_id, + struct packet **pkts, int nr_pkts) +{ + if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { + pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); + if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { + pcap_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.file->first_pkt_hdr, + (u_char *)pfile_dev_ctx->entity.file->first_pkt_data, nr_rxq, rxq_id); + pfile_dev_ctx->entity.file->first_pkt_hdr = nullptr; + pfile_dev_ctx->entity.file->first_pkt_data = nullptr; + } + pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); + } + + int packet_q_len = nr_pkts; + int res = -1; + + pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); + res = pcap_dispatch(pfile_dev_ctx->entity.file->pcap_handle, packet_q_len, + (pcap_handler)pcap_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); + pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); + if (res < 0) { + log_error(ST_ERR_PCAP_DISPATCH, "error code %d %s for %s", res, + pcap_geterr(pfile_dev_ctx->entity.file->pcap_handle), pfile_dev_ctx->entity.file->file_name); + } else if (res == 0) { + log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res); + //TODO: close pcap file + } else { + // success + struct packet *p = nullptr; + int i = 0; + pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); + do { + p = packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); + pkts[i] = p; + i++; + } while (p != nullptr && (i < nr_pkts)); + pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); + + if (nullptr == p) { + res = i - 1; + } else { + res = nr_pkts; + } + } + + return res; +} + +static int pcap_directory_get_modified_time(char *pfile, struct timespec *out) +{ + struct stat buf; + int ret = -1; + + if (nullptr == pfile) { + return ret; + } + + if ((ret = stat(pfile, &buf)) != 0) { + return ret; + } + + out->tv_sec = buf.st_mtim.tv_sec; + out->tv_nsec = buf.st_mtim.tv_nsec; + + return ret; +} + +struct pending_file * +find_pending_file_to_add(struct pio_pcap_file_device_context *pfile_dev_ctx, struct dirent *dir, + struct timespec *deadline) +{ + char abs_path[PATH_MAX] = {0}; - /* TODO: */ - //pcap_close(pdev->entity.pcap_file_dev->pcap_handle); + snprintf(abs_path, PATH_MAX, "%s/%s", pfile_dev_ctx->entity.dir->dir_name, dir->d_name); + struct timespec temp_time; + memset(&temp_time, 0, sizeof(struct timespec)); + + if (pcap_directory_get_modified_time(abs_path, &temp_time) < 0) { + log_debug("unable to get modified time on %s, skipping", abs_path); + return nullptr; + } + + /* skip files outside of out time range */ + if (compare_timespec(&temp_time, &pfile_dev_ctx->shared.last_processed_ts) <= 0) { + log_debug("skipping old file %s", abs_path); + return nullptr; + } else if (compare_timespec(&temp_time, deadline) >= 0) { + log_debug("skipping new file %s", abs_path); + return nullptr; + } + + struct pending_file *file_to_add = CALLOC(struct pending_file, 1); + strncpy(file_to_add->file_name, abs_path, strlen(abs_path)); + copy_timespec(&temp_time, &file_to_add->modified_time); + log_info("found \"%s\" at %" PRIuMAX, file_to_add->file_name, + (uintmax_t)timespec_to_millisecond(&file_to_add->modified_time)); + + return file_to_add; +} + +static int pcap_directory_insert_file(struct pio_pcap_file_device_context *pfile_dev_ctx, struct pending_file *file_to_add) +{ + if (nullptr == pfile_dev_ctx || file_to_add) { + log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory or file parameters."); + return -1; + } + + log_debug("inserting %s into pending file queue", file_to_add->file_name); + + if (TAILQ_EMPTY(&pfile_dev_ctx->entity.dir->file_queue_head)) { + TAILQ_INSERT_TAIL(&pfile_dev_ctx->entity.dir->file_queue_head, file_to_add, next); + } else { + struct pending_file *file_to_compare = TAILQ_FIRST(&pfile_dev_ctx->entity.dir->file_queue_head); + while (file_to_compare != nullptr) { + if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) < 0) { + TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next); + file_to_compare = nullptr; + } else { + struct pending_file *next_file_to_compare = TAILQ_NEXT(file_to_compare, next); + if (next_file_to_compare == nullptr) { + TAILQ_INSERT_AFTER(&pfile_dev_ctx->entity.dir->file_queue_head, file_to_compare, file_to_add, next); + } + file_to_compare = next_file_to_compare; + } + } + } + + return 0; +} + +static int pcap_directory_collect_pending_files(struct pio_pcap_file_device_context *pfile_dev_ctx, struct timespec *deadline) +{ + if (nullptr == pfile_dev_ctx) { + return -1; + } + + if (strlen(pfile_dev_ctx->entity.dir->dir_name) == 0) { + log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory name."); + return -1; + } + + struct dirent *dir = nullptr; + struct pending_file *file_to_add = nullptr; + + while ((dir = readdir(pfile_dev_ctx->entity.dir->directory)) != nullptr) { + /* ignore non plain file */ + if (dir->d_type != DT_REG) { + continue; + } + + /* ignore . and .. */ + if (strcmp(dir->d_name, ".") == 0 || strcmp(dir->d_name, "..") == 0) { + continue; + } + + file_to_add = find_pending_file_to_add(pfile_dev_ctx, dir, deadline); + if (nullptr == file_to_add) { + continue; + } + + if (pcap_directory_insert_file(pfile_dev_ctx, file_to_add) < 0) { + log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to insert file into directory"); + FREE(file_to_add); + return -1; + } + } + + return 0; +} + +static int pcap_directory_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t nr_rxq, uint32_t rxq_id, + struct packet **pkts, int nr_pkts) +{ + int res = -1; + struct timespec deadline; + + memset(&deadline, 0, sizeof(struct timespec)); + get_current_timespec(&deadline); + /* the newest file which can be processed */ + deadline.tv_sec = deadline.tv_sec - pfile_dev_ctx->entity.dir->delay; + + /* collect pending files in current directory */ + if (pcap_directory_collect_pending_files(pfile_dev_ctx, &deadline) < 0) { + log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to collect pending files in directory."); + return -1; + } + + /* file_queue is empty */ + if (TAILQ_EMPTY(&pfile_dev_ctx->entity.dir->file_queue_head)) { + log_info("directory %s has no files to process", pfile_dev_ctx->entity.dir->dir_name); + return 0; + } + + struct timespec last_time_seen; + memset(&last_time_seen, 0, sizeof(struct timespec)); + + // file_queue_head has pending files + if (nullptr == pfile_dev_ctx->entity.dir->current_file) { + /* not open file yet */ + struct pending_file *current_file = TAILQ_FIRST(&pfile_dev_ctx->entity.dir->file_queue_head); + log_info("processing file %s", current_file->file_name); + struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1); + if (nullptr == pfile_info) { + log_error(ST_ERR_MEM_ALLOC, "alloc pcap_plain_file_info failed."); + return -1; + } + + strncpy(pfile_info->file_name, current_file->file_name, strlen(current_file->file_name)); + pfile_info->shared = &pfile_dev_ctx->shared; + + if (init_pcap_file(pfile_info) < 0) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); + FREE(current_file); + FREE(pfile_info); + return -1; + } else { + pfile_dev_ctx->entity.dir->current_file = pfile_info; + res = pcap_file_dispatch(pfile_dev_ctx, nr_rxq, rxq_id, pkts, nr_pkts); + if (res < 0) { + FREE(current_file); + return -1; + } + log_info("processed file %s, processed up to %" PRIuMAX, + current_file->file_name, (uintmax_t)timespec_to_millisecond(¤t_file->modified_time)); + if (compare_timespec(¤t_file->modified_time, &last_time_seen) > 0) { + copy_timespec(¤t_file->modified_time, &last_time_seen); + } + FREE(current_file); + if (res == 0) { // reach the end of the file + pfile_dev_ctx->entity.dir->current_file = nullptr; + } + } + } else { + /* file has been opened */ + res = pcap_file_dispatch(pfile_dev_ctx, nr_rxq, rxq_id, pkts, nr_pkts); + if (res < 0) { + return -1; + } + } + + if (compare_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts) > 0) { + log_info("updating processed to %" PRIuMAX, (uintmax_t)timespec_to_millisecond(&last_time_seen)); + copy_timespec(&last_time_seen, &pfile_dev_ctx->shared.last_processed_ts); + } + + return res; +} + +static int pcap_file_exit(int status, struct timespec *last_processed) +{ return 0; } int pio_pcap_file_device_receive(struct packet_io_device *pdev, uint32_t rxq_id, struct packet **pkts, int nr_pkts) { + struct pio_pcap_file_device_context *pfile_dev_ctx = pdev->entity.pcap_file_dev_ctx; + if (nullptr == pfile_dev_ctx) { + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "invalid pcap_file_dev_ctx pointer."); + return -1; + } - return 0; + int res = -1; + if (pfile_dev_ctx->is_dir == 0) { + log_info("Start reading file:%s", pfile_dev_ctx->entity.file->file_name); + res = pcap_file_dispatch(pfile_dev_ctx, pdev->rxq_num, rxq_id, pkts, nr_pkts); + } else { + log_info("Start reading directory:%s", pfile_dev_ctx->entity.dir->dir_name); + res = pcap_directory_dispatch(pfile_dev_ctx, pdev->rxq_num, rxq_id, pkts, nr_pkts); + } + + //pcap_file_exit(status, &pfile_dev_ctx->shared.last_processed_ts); + + return res; } int pio_pcap_file_device_send(struct packet_io_device *pdev, uint32_t txq_id, struct packet **pkts, int nr_pkts) @@ -140,12 +628,24 @@ void pio_pcap_file_device_pkt_free(struct packet_io_device *pdev, uint32_t qid, int pio_pcap_file_instance_create(struct packet_io_instance *pinst, __unused int wrk_thread_num) { + if (nullptr == pinst) { + log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "invalid pcap file instance pointer."); + return -1; + } + + pinst->entity.pcap_file_inst_ctx = CALLOC(struct pio_pcap_file_instance_context, 1); + if (nullptr == pinst->entity.pcap_file_inst_ctx) { + log_error(ST_ERR_PIO_PCAP_FILE_INSTANCE, "alloc pcap_file_inst_ctx failed."); + return -1; + } return 0; } void pio_pcap_file_instance_destroy(struct packet_io_instance *pinst) { + FREE(pinst->entity.pcap_file_inst_ctx); + for (uint32_t i = 0; i < pinst->dev_cnt; i++) { pio_pcap_file_device_close(pinst->devices[i]); FREE(pinst->devices[i]); diff --git a/src/packet_io/pcap_file_mode/pio_pcap_file.h b/src/packet_io/pcap_file_mode/pio_pcap_file.h index bbcb86a..e51c9d6 100644 --- a/src/packet_io/pcap_file_mode/pio_pcap_file.h +++ b/src/packet_io/pcap_file_mode/pio_pcap_file.h @@ -12,9 +12,13 @@ #include #include -#include #include +#include "../../common/global_var.h" +#include "../../common/packet_queue.h" + +#define PKT_QUEUE_MAX_NUM 256 + struct pio_pcap_file_instance_context { }; @@ -22,14 +26,14 @@ struct pio_pcap_file_instance_context { struct pending_file { char file_name[NAME_MAX]; struct timespec modified_time; - TAILQ_ENTRY(pending_file) entry; + TAILQ_ENTRY(pending_file) next; }; struct pcap_file_shared_info { - /* reserved for bpf filter rules */ - char *bpf_string; + /* bpf filter string, such as "tcp and port 25"*/ + char bpf_string[STR_MAX_LEN]; - /* if true which means pcap file will be deleted after processed */ + /* delete after the pcap file is read */ bool should_delete; /* the timestamp of the last process */ @@ -47,6 +51,7 @@ struct pcap_file_shared_info { struct pcap_plain_file_info { char file_name[NAME_MAX]; pcap_t *pcap_handle; + pthread_mutex_t handle_mutex; int data_link; struct bpf_program filter; @@ -66,11 +71,11 @@ struct pcap_file_directory_info { struct pcap_plain_file_info *current_file; /* whether to loop through the pcap files in the directory */ - bool should_loop; + //bool should_loop; time_t delay; /* poll pcap file interval */ - time_t poll_interval; + //time_t poll_interval; /* the pending files queue for the specific directory */ TAILQ_HEAD(pending_file_queue, pending_file) file_queue_head; @@ -78,6 +83,9 @@ struct pcap_file_directory_info { struct pcap_file_shared_info *shared; }; +/* + * @brief pio_pcap_file_device_context - pcap file device context +**/ struct pio_pcap_file_device_context { union { struct pcap_file_directory_info *dir; @@ -85,18 +93,22 @@ struct pio_pcap_file_device_context { } entity; bool is_dir; + + /* rx packet queue */ + struct packet_queue pkt_queues[PKT_QUEUE_MAX_NUM]; + struct pcap_file_shared_info shared; }; /* * @brief open pcap_file device * - * @param pdev: pcap_file device's pointer - * @param dev_name: the path of pcap file - * @param nr_rxq: number of the packet receiving queues for the device - * @param nr_txq: number of the packet sending queues for the device + * @param pdev: pcap_file device's pointer which support following params + * pdev->dev_name: the name of pcap file + * pdev->rxq_num: number of the packet receiving queues for the device + * pdev->txq_num: number of the packet sending queues for the device **/ -int pio_pcap_file_device_open(struct packet_io_device *pdev, const char *path, uint32_t nr_rxq, uint32_t nr_txq); +int pio_pcap_file_device_open(struct packet_io_device *pdev); /* * @brief close pcap_live device diff --git a/src/packet_io/pcap_live_mode/pio_pcap_live.cpp b/src/packet_io/pcap_live_mode/pio_pcap_live.cpp index 9b1b212..a02c2b7 100644 --- a/src/packet_io/pcap_live_mode/pio_pcap_live.cpp +++ b/src/packet_io/pcap_live_mode/pio_pcap_live.cpp @@ -12,7 +12,7 @@ #include "../packet_io.h" #include "../../sdk/include/utils.h" -int pio_pcap_live_device_open(struct packet_io_device *pdev, const char *dev_name, uint32_t nr_rxq, uint32_t nr_txq) { +int pio_pcap_live_device_open(struct packet_io_device *pdev) { return 0; } diff --git a/src/packet_io/pcap_live_mode/pio_pcap_live.h b/src/packet_io/pcap_live_mode/pio_pcap_live.h index f1b7ac2..dd6e1d8 100644 --- a/src/packet_io/pcap_live_mode/pio_pcap_live.h +++ b/src/packet_io/pcap_live_mode/pio_pcap_live.h @@ -17,6 +17,9 @@ struct pio_pcap_live_instance_context { }; +/* + * @brief pio_pcap_file_device_context - pcap file device context +**/ struct pio_pcap_live_device_context { pcap_t *pcap_handle; }; @@ -24,12 +27,12 @@ struct pio_pcap_live_device_context { /* * @brief open pcap_live device * - * @param pdev: pcap_live device's pointer - * @param dev_name: device name, such as eth1, eth2 ... - * @param nr_rxq: number of the packet receiving queues for the device - * @param nr_txq: number of the packet sending queues for the device + * @param pdev: pcap_live device's pointer which support following params + * pdev->dev_name: the name of pcap device, such as eth1, eth2, ... + * pdev->rxq_num: number of the packet receiving queues for the device + * pdev->txq_num: number of the packet sending queues for the device **/ -int pio_pcap_live_device_open(struct packet_io_device *pdev, const char *dev_name, uint32_t nr_rxq, uint32_t nr_txq); +int pio_pcap_live_device_open(struct packet_io_device *pdev); /* * @brief close pcap_live device diff --git a/src/packet_io/test/CMakeLists.txt b/src/packet_io/test/CMakeLists.txt index 3b8b652..4f83389 100644 --- a/src/packet_io/test/CMakeLists.txt +++ b/src/packet_io/test/CMakeLists.txt @@ -7,6 +7,7 @@ target_link_libraries( gtest_main packet_io dl + pcap ) include(GoogleTest)