diff --git a/sdk/include/logger.h b/sdk/include/logger.h index 3011197..81a650b 100644 --- a/sdk/include/logger.h +++ b/sdk/include/logger.h @@ -15,9 +15,9 @@ extern "C" { #endif -#define log_debug(x, ...) +#define log_debug(x, format, ...) -#define log_info(x, ...) +#define log_info(x, format, ...) #define log_notice(x, ...) diff --git a/src/app.toml b/src/app.toml index a801e28..28accdc 100644 --- a/src/app.toml +++ b/src/app.toml @@ -15,7 +15,7 @@ # example3: RUN_MODE="PCAP_FILE_MODE" WORKER_THREAD_NUM=2 # Prompt marsio how many threads to start to receive packets -PCAP_FILE_PATH="/home/liuwentan/project/stellar/pcap_test" # if single file, specify dir+filename; if pcapfile directory, specify dir +PCAP_FILE_PATH="/home/liuwentan/project/stellar/stellar/src/packet_io/test/pcap_sample" # if single file, specify dir+filename; if pcapfile directory, specify dir DELETE_WHEN_DONE=0 # 0(false) 1(true), default 0, if delete it when the pcapfile is processed # BPF_FILTER="port 80 and udp" # default null diff --git a/src/main.cpp b/src/main.cpp index ea71832..3c792a0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,6 +26,7 @@ #include "http.h" #include "utils.h" #include "util_errors.h" +#include "packet_io_internal.h" struct worker_thread_ctx { diff --git a/src/packet_io/pcap_file_mode/pio_pcap_file.cpp b/src/packet_io/pcap_file_mode/pio_pcap_file.cpp index e57d3bf..7fec9a4 100644 --- a/src/packet_io/pcap_file_mode/pio_pcap_file.cpp +++ b/src/packet_io/pcap_file_mode/pio_pcap_file.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "utils.h" #include "util_errors.h" @@ -25,6 +26,8 @@ #include "packet_io_util.h" #include "packet_io_internal.h" +#define MAX_RECV_BURST 64 + struct safe_pending_file_queue g_pending_file_queue; /** @@ -63,26 +66,6 @@ static ssize_t validate_directory_or_file(const char *path, DIR **dir) return ret; } -/** - * @brief get the timestamp of the first packet and rewind - * - * @retval true(success), false(error) - */ -static bool peek_first_packet_timestamp(struct pcap_plain_file_info *pfile_info) -{ - int ret = pcap_next_ex(pfile_info->pcap_handle, &pfile_info->first_pkt_hdr, - &pfile_info->first_pkt_data); - if (ret <= 0 || (nullptr == pfile_info->first_pkt_hdr)) { - log_error(ST_ERR_PCAP_OPEN_OFFLINE, "failed to get first packet timestamp"); - return false; - } - - pfile_info->first_pkt_ts.tv_sec = pfile_info->first_pkt_hdr->ts.tv_sec; - pfile_info->first_pkt_ts.tv_usec = pfile_info->first_pkt_hdr->ts.tv_usec; - - return true; -} - static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info) { char errbuf[PCAP_ERRBUF_SIZE] = ""; @@ -118,9 +101,6 @@ static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info) } #endif pfile_info->data_link = pcap_datalink(pfile_info->pcap_handle); - if (!peek_first_packet_timestamp(pfile_info)) { - return -1; - } return 0; } @@ -344,7 +324,8 @@ void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_c p->pkt_hdr = p; p->pkt_payload = (uint8_t *)p + CUSTOM_ZONE_LEN; p->pkt_len = pkt_hdr->caplen; - p->data_link = pfile_dev_ctx->entity.file->data_link; + //p->data_link = pfile_dev_ctx->entity.file->data_link; + p->data_link = 1; if (packet_copy_data((uint8_t *)p->pkt_payload, (uint8_t *)pkt, pkt_hdr->caplen)) { FREE(p); @@ -361,80 +342,55 @@ void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_c pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); } -static ssize_t pcap_directory_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, - struct stellar_packet **pkts, size_t nr_pkts) +static ssize_t pcap_directory_file_pkts_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id) { - if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) { - pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); - if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) { - pcap_file_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr, - (u_char *)pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data); - pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr = nullptr; - pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data = nullptr; - } - pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); - } - - int packet_q_len = nr_pkts; + int packet_q_len = MAX_RECV_BURST; ssize_t res = -1; - pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); res = pcap_dispatch(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle, packet_q_len, (pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx); - pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex); if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle), pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name); } else if (res == 0) { log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name, res); - //TODO: close pcap file - } else { - // success - struct pio_packet *p = nullptr; - size_t i = 0; - uint32_t q_len = 0; - pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); - - /* if pkt queue is empty */ - if (pfile_dev_ctx->pkt_queues[rxq_id].len == 0) { - pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); - return 0; - } - - do { - p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); - pkts[i] = (struct stellar_packet *)p; - q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; - i++; - } while ((q_len != 0) && (i < nr_pkts)); - pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); - - if (q_len == 0) { - res = i; - } else { - res = nr_pkts; - } } return res; } -static ssize_t pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, - struct stellar_packet **pkts, size_t nr_pkts) +static ssize_t get_pcap_file_pkts(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, + struct stellar_packet **pkts, size_t nr_pkts) { - if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { - pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); - if (pfile_dev_ctx->entity.file->first_pkt_hdr != nullptr) { - pcap_file_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.file->first_pkt_hdr, - (u_char *)pfile_dev_ctx->entity.file->first_pkt_data); - pfile_dev_ctx->entity.file->first_pkt_hdr = nullptr; - pfile_dev_ctx->entity.file->first_pkt_data = nullptr; - } - pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex); + ssize_t res = -1; + size_t i = 0; + uint32_t q_len = 0; + struct pio_packet *p = nullptr; + + pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); + if (pfile_dev_ctx->pkt_queues[rxq_id].len > 0) { + do { + p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); + q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; + pkts[i] = (struct stellar_packet *)p; + i++; + } while ((q_len != 0) && (i < nr_pkts)); + } + pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); + + if (q_len == 0) { + res = i; + } else { + res = nr_pkts; } - int packet_q_len = nr_pkts; + return res; +} + +static ssize_t pcap_file_pkts_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx) +{ + int packet_q_len = MAX_RECV_BURST; ssize_t res = -1; pthread_mutex_lock(&pfile_dev_ctx->entity.file->handle_mutex); @@ -444,31 +400,8 @@ static ssize_t pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res, pcap_geterr(pfile_dev_ctx->entity.file->pcap_handle), pfile_dev_ctx->entity.file->file_name); - } else { - if (res == 0) { - log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res); - } - - // success - struct pio_packet *p = nullptr; - size_t i = 0; - uint32_t q_len = 0; - pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); - if (pfile_dev_ctx->pkt_queues[rxq_id].len != 0) { - do { - p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]); - q_len = pfile_dev_ctx->pkt_queues[rxq_id].len; - pkts[i] = (struct stellar_packet *)p; - i++; - } while ((q_len != 0) && (i < nr_pkts)); - } - pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q); - - if (q_len == 0) { - res = i; - } else { - res = nr_pkts; - } + } else if (res == 0) { + log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res); } return res; @@ -569,6 +502,197 @@ static ssize_t pcap_directory_insert_file(struct pending_file *file_to_add) return 0; } +#define SWAPLONG(y) \ + (((((u_int)(y))&0xff)<<24) | \ + ((((u_int)(y))&0xff00)<<8) | \ + ((((u_int)(y))&0xff0000)>>8) | \ + ((((u_int)(y))>>24)&0xff)) + +#define SWAPSHORT(y) \ + ((u_short)(((((u_int)(y))&0xff)<<8) | \ + ((((u_int)(y))&0xff00)>>8))) + +/* Standard libpcap format. */ +#define TCPDUMP_MAGIC 0xa1b2c3d4 + +/* Alexey Kuznetzov's modified libpcap format. */ +#define KUZNETZOV_TCPDUMP_MAGIC 0xa1b2cd34 + +/* + * Normal libpcap format, except for seconds/nanoseconds timestamps, + * as per a request by Ulf Lamping + */ +#define NSEC_TCPDUMP_MAGIC 0xa1b23c4d + +#define LT_RESERVED1(x) ((x) & 0x03FF0000) + +static ssize_t pcap_header_check(const uint8_t *magic, FILE *fp) +{ + int swapped = 0; + size_t amt_read; + uint32_t magic_int; + struct pcap_file_header hdr; + + /* + * Check whether the first 4 bytes of the file are the magic + * number for a pcap savefile, or for a byte-swapped pcap + * savefile. + */ + memcpy(&magic_int, magic, sizeof(magic_int)); + if (magic_int != TCPDUMP_MAGIC && magic_int != KUZNETZOV_TCPDUMP_MAGIC && magic_int != NSEC_TCPDUMP_MAGIC) { + magic_int = SWAPLONG(magic_int); + if (magic_int != TCPDUMP_MAGIC && magic_int != KUZNETZOV_TCPDUMP_MAGIC && magic_int != NSEC_TCPDUMP_MAGIC) { + return -1; + } + swapped = 1; + } + + /* + * They are. Put the magic number in the header, and read + * the rest of the header. + */ + hdr.magic = magic_int; + amt_read = fread(((char *)&hdr) + sizeof hdr.magic, 1, + sizeof(hdr) - sizeof(hdr.magic), fp); + if (amt_read != sizeof(hdr) - sizeof(hdr.magic)) { + return -1; + } + + /* + * If it's a byte-swapped capture file, byte-swap the header. + */ + if (swapped) { + hdr.version_major = SWAPSHORT(hdr.version_major); + hdr.version_minor = SWAPSHORT(hdr.version_minor); + hdr.thiszone = SWAPLONG(hdr.thiszone); + hdr.sigfigs = SWAPLONG(hdr.sigfigs); + hdr.snaplen = SWAPLONG(hdr.snaplen); + hdr.linktype = SWAPLONG(hdr.linktype); + } + + if (hdr.version_major < PCAP_VERSION_MAJOR) { + return -1; + } + + /* + * currently only versions 2.[0-4] are supported with + * the exception of 543.0 for DG/UX tcpdump. + */ + if (! ((hdr.version_major == PCAP_VERSION_MAJOR && + hdr.version_minor <= PCAP_VERSION_MINOR) || + (hdr.version_major == 543 && hdr.version_minor == 0))) { + return -1; + } + + /* + * Check the main reserved field. + */ + if (LT_RESERVED1(hdr.linktype) != 0) { + return -1; + } + + return 0; +} + + +#define BT_SHB 0x0A0D0D0A +#define BT_SHB_INSANE_MAX 1024U*1024U*1U /* 1MB should be enough */ +#define BYTE_ORDER_MAGIC 0x1A2B3C4D +struct section_header_block { + uint32_t byte_order_magic; + u_short major_version; + u_short minor_version; + uint64_t section_length; + /* followed by options and trailer */ +}; + +struct block_header { + uint32_t block_type; + uint32_t total_length; +}; + +static ssize_t pcapng_header_check(const uint8_t *magic, FILE *fp) +{ + uint32_t magic_int; + uint32_t total_length; + uint32_t byte_order_magic; + size_t amt_read; + struct block_header *bhdrp; + struct section_header_block *shbp; + + /* + * Check whether the first 4 bytes of the file are the block + * type for a pcapng savefile. + */ + memcpy(&magic_int, magic, sizeof(magic_int)); + if (magic_int != BT_SHB) { + return -1; + } + + /* + * OK, they are. However, that's just \n\r\r\n, so it could, + * conceivably, be an ordinary text file. + * + * It could not, however, conceivably be any other type of + * capture file, so we can read the rest of the putative + * Section Header Block; put the block type in the common + * header, read the rest of the common header and the + * fixed-length portion of the SHB, and look for the byte-order + * magic value. + */ + amt_read = fread(&total_length, 1, sizeof(total_length), fp); + if (amt_read < sizeof(total_length)) { + return -1; + } + + amt_read = fread(&byte_order_magic, 1, sizeof(byte_order_magic), fp); + if (amt_read < sizeof(byte_order_magic)) { + return -1; + } + if (byte_order_magic != BYTE_ORDER_MAGIC) { + byte_order_magic = SWAPLONG(byte_order_magic); + if (byte_order_magic != BYTE_ORDER_MAGIC) { + return -1; + } + total_length = SWAPLONG(total_length); + } + + /* + * Check the sanity of the total length. + */ + if (total_length < sizeof(*bhdrp) + sizeof(*shbp) + sizeof(total_length) || + (total_length > BT_SHB_INSANE_MAX)) { + return -1; + } + + return 0; +} + +static ssize_t validate_pcap_file(struct pio_pcap_file_device_context *pfile_dev_ctx, const char *file_name) +{ + uint8_t magic[4]; + char abs_path[PATH_MAX] = {0}; + + snprintf(abs_path, sizeof(abs_path), "%s/%s", pfile_dev_ctx->entity.dir->dir_name, file_name); + FILE *fp = fopen(abs_path, "r"); + if (nullptr == fp) { + return -1; + } + + size_t ret = fread(&magic, 1, sizeof(magic), fp); + if (ret != sizeof(magic)) { + return -1; + } + + if (pcap_header_check(magic, fp) == 0) { + return 0; + } else if (pcapng_header_check(magic, fp) == 0) { + return 0; + } + + return -1; +} + static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_context *pfile_dev_ctx, struct timespec *deadline) { if (nullptr == pfile_dev_ctx) { @@ -594,6 +718,11 @@ static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_ continue; } + /* ignore non pcap file */ + if (validate_pcap_file(pfile_dev_ctx, dir->d_name) < 0) { + continue; + } + file_to_add = find_pending_file_to_add(pfile_dev_ctx, dir, deadline); if (nullptr == file_to_add) { continue; @@ -609,14 +738,14 @@ static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_ return 0; } -static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id, - struct stellar_packet **pkts, size_t nr_pkts) +static ssize_t pcap_directory_files_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id) { ssize_t res = -1; struct timespec deadline; memset(&deadline, 0, sizeof(struct timespec)); get_current_timespec(&deadline); + /* the newest file which can be processed */ deadline.tv_sec = deadline.tv_sec - pfile_dev_ctx->entity.dir->delay; @@ -632,6 +761,7 @@ static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfil /* not open file yet */ if (nullptr == pfile_dev_ctx->entity.dir->current_file[rxq_id]) { pthread_mutex_lock(&g_pending_file_queue.queue_mutex); + /* file_queue is empty */ if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) { pthread_mutex_unlock(&g_pending_file_queue.queue_mutex); @@ -654,24 +784,24 @@ static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfil res = strncpy_safe(pfile_info->file_name, current_file->file_name, sizeof(pfile_info->file_name)); if (res < 0) { - log_error(ST_ERR_STR_COPY, "pfile_info file name copy failed."); - FREE(current_file); FREE(pfile_info); + FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); + log_error(ST_ERR_STR_COPY, "pfile_info file name copy failed."); return -1; } pfile_info->shared = &pfile_dev_ctx->shared; if (init_pcap_file(pfile_info) < 0) { - log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); - FREE(current_file); FREE(pfile_info); + FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); + log_error(ST_ERR_PIO_PCAP_FILE_DEVICE, "init_pcap_file failed."); return -1; - } else { + } else { pfile_dev_ctx->entity.dir->current_file[rxq_id] = pfile_info; - res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); + res = pcap_directory_file_pkts_dispatch(pfile_dev_ctx, rxq_id); if (res < 0) { - FREE(current_file); + FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); return -1; } log_info("processed file %s, processed up to %" PRIuMAX, @@ -684,13 +814,14 @@ static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfil if (res == 0) { // reach the end of the file cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]); - FREE(current_file); + FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); } } } else { /* file has been opened */ - res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); + res = pcap_directory_file_pkts_dispatch(pfile_dev_ctx, rxq_id); if (res < 0) { + FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]); return -1; } else if (res == 0) { cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]); @@ -718,10 +849,20 @@ ssize_t pio_pcap_file_device_receive(struct packet_io_device *pdev, uint32_t rxq ssize_t res = -1; if (pfile_dev_ctx->is_dir == 0) { log_info("Start reading file:%s", pfile_dev_ctx->entity.file->file_name); - res = pcap_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); + + /* read pcap file and dispatch pkts to pkt_queue */ + pcap_file_pkts_dispatch(pfile_dev_ctx); + + /* get pkts from pkt_queue */ + res = get_pcap_file_pkts(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } else { log_info("Start reading directory:%s", pfile_dev_ctx->entity.dir->dir_name); - res = pcap_directory_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts); + + /* read directory pcap files and dispatch pkts to pkt_queue */ + pcap_directory_files_dispatch(pfile_dev_ctx, rxq_id); + + /* get pkts from pkt_queue[rxq_id] */ + res = get_pcap_file_pkts(pfile_dev_ctx, rxq_id, pkts, nr_pkts); } return res; diff --git a/src/packet_io/pcap_file_mode/pio_pcap_file.h b/src/packet_io/pcap_file_mode/pio_pcap_file.h index c02caec..24641e4 100644 --- a/src/packet_io/pcap_file_mode/pio_pcap_file.h +++ b/src/packet_io/pcap_file_mode/pio_pcap_file.h @@ -65,11 +65,6 @@ struct pcap_plain_file_info { int data_link; struct bpf_program filter; - /* get the first packet timestamp */ - const u_char *first_pkt_data; - struct pcap_pkthdr *first_pkt_hdr; - struct timeval first_pkt_ts; - struct pcap_file_shared_info *shared; }; diff --git a/src/packet_io/pcap_live_mode/pio_pcap_live.cpp b/src/packet_io/pcap_live_mode/pio_pcap_live.cpp index 3b7f2a8..cb60221 100644 --- a/src/packet_io/pcap_live_mode/pio_pcap_live.cpp +++ b/src/packet_io/pcap_live_mode/pio_pcap_live.cpp @@ -212,23 +212,21 @@ ssize_t pio_pcap_live_device_receive(struct packet_io_device *pdev, uint32_t rxq if (res < 0) { log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s", res, pcap_geterr(plive_dev_ctx->pcap_handle)); - } else if (res == 0) { - } else { struct pio_packet *p = nullptr; size_t i = 0; uint32_t q_len = 0; + pthread_mutex_lock(&plive_dev_ctx->pkt_queues[rxq_id].mutex_q); if (plive_dev_ctx->pkt_queues[rxq_id].len > 0) { - pthread_mutex_lock(&plive_dev_ctx->pkt_queues[rxq_id].mutex_q); do { p = pio_packet_dequeue(&plive_dev_ctx->pkt_queues[rxq_id]); q_len = plive_dev_ctx->pkt_queues[rxq_id].len; pkts[i] = (struct stellar_packet *)p; i++; } while ((q_len != 0) && (i < nr_pkts)); - pthread_mutex_unlock(&plive_dev_ctx->pkt_queues[rxq_id].mutex_q); } + pthread_mutex_unlock(&plive_dev_ctx->pkt_queues[rxq_id].mutex_q); if (q_len == 0) { res = i; diff --git a/src/packet_io/test/gtest_packet_io.cpp b/src/packet_io/test/gtest_packet_io.cpp index 0d0eb47..d3eae75 100644 --- a/src/packet_io/test/gtest_packet_io.cpp +++ b/src/packet_io/test/gtest_packet_io.cpp @@ -458,7 +458,7 @@ TEST(PACKET_IO_PIO_PCAP_LIVE_Test, pio_pcap_live_device_open_and_close) { FREE(pio_instance); } -TEST(PACKET_IO_PIO_PCAP_LIVE_Test, pio_pcap_live_device_send) { +TEST(PACKET_IO_PIO_PCAP_LIVE_Test, pio_pcap_live_device_receive_and_send) { struct packet_io_instance *ppio_inst_file = packet_io_instance_create("stellar_file", PACKET_IO_RUN_MODE_PCAP_FILE); EXPECT_NE(ppio_inst_file, nullptr); struct packet_io_instance *ppio_inst_live = packet_io_instance_create("stellar_live", PACKET_IO_RUN_MODE_PCAP_LIVE); @@ -475,6 +475,10 @@ TEST(PACKET_IO_PIO_PCAP_LIVE_Test, pio_pcap_live_device_send) { ssize_t res = packet_io_device_tx(pdev_live, 0, tx_pkts, 64); EXPECT_EQ(res, 0); + struct stellar_packet *rx_pkts[64]; + res = packet_io_device_rx(pdev_live, 0, rx_pkts, 64); + EXPECT_EQ(res, 0); + packet_io_device_close(pdev_file); packet_io_device_close(pdev_live); packet_io_fini(ppio_inst_file);