[PACKET_IO]fix pcap file mode counter bug

This commit is contained in:
liuwentan
2022-08-22 16:00:09 +08:00
parent 90b359ed40
commit c461ca3837
11 changed files with 167 additions and 63 deletions

View File

@@ -14,6 +14,7 @@
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include "utils.h"
#include "util_errors.h"
@@ -24,6 +25,8 @@
#include "packet_io_util.h"
#include "packet_io_internal.h"
struct safe_pending_file_queue g_pending_file_queue;
/**
* @brief validate path is a valid plain file or directory
*
@@ -96,7 +99,7 @@ static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info)
}
pthread_mutex_init(&pfile_info->handle_mutex, nullptr);
#if 0
if (pfile_info->shared != nullptr && pfile_info->shared->bpf_string != nullptr) {
if (pcap_compile(pfile_info->pcap_handle, &pfile_info->filter,
pfile_info->shared->bpf_string, 1, 0) < 0) {
@@ -113,7 +116,7 @@ static ssize_t init_pcap_file(struct pcap_plain_file_info *pfile_info)
}
pcap_freecode(&pfile_info->filter);
}
#endif
pfile_info->data_link = pcap_datalink(pfile_info->pcap_handle);
if (!peek_first_packet_timestamp(pfile_info)) {
return -1;
@@ -178,11 +181,14 @@ static ssize_t pcap_directory_file_init(struct pio_pcap_file_device_context *pfi
pdir_info->delay = 30;
pdir_info->shared = &pfile_dev_ctx->shared;
pdir_info->directory = directory;
TAILQ_INIT(&pdir_info->file_queue_head);
pfile_dev_ctx->is_dir = 1;
pfile_dev_ctx->entity.dir = pdir_info;
/* init global pending file queue */
TAILQ_INIT(&g_pending_file_queue.file_queue_head);
pthread_mutex_init(&g_pending_file_queue.queue_mutex, nullptr);
return 0;
}
@@ -207,8 +213,7 @@ static ssize_t pcap_file_shared_init(struct pio_pcap_file_device_context *pfile_
pfile_dev_ctx->shared.should_delete = g_packet_io_config.pcap.should_delete;
/* init pcap file device packet queue */
for (uint32_t i = 0; i < PKT_QUEUE_MAX_NUM; i++) {
for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) {
pio_packet_queue_init(&pfile_dev_ctx->pkt_queues[i]);
}
@@ -305,7 +310,7 @@ ssize_t pio_pcap_file_device_close(struct packet_io_device *pdev)
cleanup_pcap_directory_info(pdev->entity.pcap_file_dev_ctx->entity.dir);
}
for (uint32_t i = 0; i < PKT_QUEUE_MAX_NUM; i++) {
for (uint32_t i = 0; i < VIRTUAL_QUEUE_MAX_NUM; i++) {
if (pdev->entity.pcap_file_dev_ctx->pkt_queues[i].len != 0) {
release_pio_packet_queue(&pdev->entity.pcap_file_dev_ctx->pkt_queues[i]);
}
@@ -344,7 +349,7 @@ void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_c
return;
}
/* nr_rxq <= PKT_QUEUE_MAX_NUM */
/* nr_rxq <= VIRTUAL_QUEUE_MAX_NUM */
uint16_t nr_rxq = pfile_dev_ctx->pdev->rxq_num;
uint16_t rxq_id = pio_packet_hash(p) % nr_rxq;
@@ -354,6 +359,65 @@ void pcap_file_pkt_callback_oneshot(char *user, struct pcap_pkthdr *pkt_hdr, u_c
pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
}
static ssize_t pcap_directory_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id,
struct stellar_packet **pkts, size_t nr_pkts)
{
if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) {
pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex);
if (pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr != nullptr) {
pcap_file_pkt_callback_oneshot((char *)pfile_dev_ctx, pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr,
(u_char *)pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data);
pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_hdr = nullptr;
pfile_dev_ctx->entity.dir->current_file[rxq_id]->first_pkt_data = nullptr;
}
pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex);
}
int packet_q_len = nr_pkts;
ssize_t res = -1;
pthread_mutex_lock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex);
res = pcap_dispatch(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle, packet_q_len,
(pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx);
pthread_mutex_unlock(&pfile_dev_ctx->entity.dir->current_file[rxq_id]->handle_mutex);
if (res < 0) {
log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res,
pcap_geterr(pfile_dev_ctx->entity.dir->current_file[rxq_id]->pcap_handle),
pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name);
} else if (res == 0) {
log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.dir->current_file[rxq_id]->file_name, res);
//TODO: close pcap file
} else {
// success
struct pio_packet *p = nullptr;
size_t i = 0;
uint32_t q_len = 0;
pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
/* if pkt queue is empty */
if (pfile_dev_ctx->pkt_queues[rxq_id].len == 0) {
pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
return 0;
}
do {
p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]);
pkts[i] = (struct stellar_packet *)p;
q_len = pfile_dev_ctx->pkt_queues[rxq_id].len;
i++;
} while ((q_len != 0) && (i < nr_pkts));
pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
if (q_len == 0) {
res = i;
} else {
res = nr_pkts;
}
}
return res;
}
static ssize_t pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev_ctx, uint32_t rxq_id,
struct stellar_packet **pkts, size_t nr_pkts)
{
@@ -376,23 +440,26 @@ static ssize_t pcap_file_dispatch(struct pio_pcap_file_device_context *pfile_dev
(pcap_handler)pcap_file_pkt_callback_oneshot, (u_char *)pfile_dev_ctx);
pthread_mutex_unlock(&pfile_dev_ctx->entity.file->handle_mutex);
if (res < 0) {
log_error(ST_ERR_PCAP_DISPATCH, "error code %d %s for %s", res,
log_error(ST_ERR_PCAP_DISPATCH, "error code %ld %s for %s", res,
pcap_geterr(pfile_dev_ctx->entity.file->pcap_handle), pfile_dev_ctx->entity.file->file_name);
} else if (res == 0) {
log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res);
//TODO: close pcap file
} else {
if (res == 0) {
log_info("reach end of pcap file %s (error code %d)", pfile_dev_ctx->entity.file->file_name, res);
}
// success
struct pio_packet *p = nullptr;
size_t i = 0;
uint32_t q_len = 0;
pthread_mutex_lock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
do {
p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]);
q_len = pfile_dev_ctx->pkt_queues[rxq_id].len;
pkts[i] = (struct stellar_packet *)p;
i++;
} while ((q_len != 0) && (i < nr_pkts));
if (pfile_dev_ctx->pkt_queues[rxq_id].len != 0) {
do {
p = pio_packet_dequeue(&pfile_dev_ctx->pkt_queues[rxq_id]);
q_len = pfile_dev_ctx->pkt_queues[rxq_id].len;
pkts[i] = (struct stellar_packet *)p;
i++;
} while ((q_len != 0) && (i < nr_pkts));
}
pthread_mutex_unlock(&pfile_dev_ctx->pkt_queues[rxq_id].mutex_q);
if (q_len == 0) {
@@ -462,32 +529,40 @@ find_pending_file_to_add(struct pio_pcap_file_device_context *pfile_dev_ctx, str
return file_to_add;
}
static ssize_t pcap_directory_insert_file(struct pio_pcap_file_device_context *pfile_dev_ctx, struct pending_file *file_to_add)
static ssize_t pcap_directory_insert_file(struct pending_file *file_to_add)
{
if (nullptr == pfile_dev_ctx || file_to_add) {
if (nullptr == file_to_add) {
log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "invalid directory or file parameters.");
return -1;
}
log_debug("inserting %s into pending file queue", file_to_add->file_name);
if (TAILQ_EMPTY(&pfile_dev_ctx->entity.dir->file_queue_head)) {
TAILQ_INSERT_TAIL(&pfile_dev_ctx->entity.dir->file_queue_head, file_to_add, next);
pthread_mutex_lock(&g_pending_file_queue.queue_mutex);
if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) {
TAILQ_INSERT_TAIL(&g_pending_file_queue.file_queue_head, file_to_add, next);
} else {
struct pending_file *file_to_compare = TAILQ_FIRST(&pfile_dev_ctx->entity.dir->file_queue_head);
/* pending file queue is not empty */
struct pending_file *file_to_compare = TAILQ_FIRST(&g_pending_file_queue.file_queue_head);
while (file_to_compare != nullptr) {
if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) < 0) {
TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next);
file_to_compare = nullptr;
} else {
} else if (compare_timespec(&file_to_add->modified_time, &file_to_compare->modified_time) > 0) {
struct pending_file *next_file_to_compare = TAILQ_NEXT(file_to_compare, next);
if (next_file_to_compare == nullptr) {
TAILQ_INSERT_AFTER(&pfile_dev_ctx->entity.dir->file_queue_head, file_to_compare, file_to_add, next);
TAILQ_INSERT_AFTER(&g_pending_file_queue.file_queue_head, file_to_compare, file_to_add, next);
}
file_to_compare = next_file_to_compare;
} else {
/* find same file, ignore it */
printf("find same file\n");
break;
}
}
}
pthread_mutex_unlock(&g_pending_file_queue.queue_mutex);
return 0;
}
@@ -522,7 +597,7 @@ static ssize_t pcap_directory_collect_pending_files(struct pio_pcap_file_device_
continue;
}
if (pcap_directory_insert_file(pfile_dev_ctx, file_to_add) < 0) {
if (pcap_directory_insert_file(file_to_add) < 0) {
log_error(ST_ERR_PCAP_FILE_COLLECT_FAILED, "failed to insert file into directory");
FREE(file_to_add);
return -1;
@@ -549,19 +624,25 @@ static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfil
return -1;
}
/* file_queue is empty */
if (TAILQ_EMPTY(&pfile_dev_ctx->entity.dir->file_queue_head)) {
log_info("directory %s has no files to process", pfile_dev_ctx->entity.dir->dir_name);
return 0;
}
struct timespec last_time_seen;
memset(&last_time_seen, 0, sizeof(struct timespec));
// file_queue_head has pending files
if (nullptr == pfile_dev_ctx->entity.dir->current_file) {
/* not open file yet */
struct pending_file *current_file = TAILQ_FIRST(&pfile_dev_ctx->entity.dir->file_queue_head);
/* not open file yet */
if (nullptr == pfile_dev_ctx->entity.dir->current_file[rxq_id]) {
pthread_mutex_lock(&g_pending_file_queue.queue_mutex);
/* file_queue is empty */
if (TAILQ_EMPTY(&g_pending_file_queue.file_queue_head)) {
pthread_mutex_unlock(&g_pending_file_queue.queue_mutex);
log_info("directory %s has no files to process", pfile_dev_ctx->entity.dir->dir_name);
return 0;
}
struct pending_file *current_file = TAILQ_FIRST(&g_pending_file_queue.file_queue_head);
TAILQ_REMOVE(&g_pending_file_queue.file_queue_head, current_file, next);
pthread_mutex_unlock(&g_pending_file_queue.queue_mutex);
pfile_dev_ctx->entity.dir->pending_file[rxq_id] = current_file;
log_info("processing file %s", current_file->file_name);
struct pcap_plain_file_info *pfile_info = CALLOC(struct pcap_plain_file_info, 1);
if (nullptr == pfile_info) {
@@ -584,28 +665,35 @@ static ssize_t pcap_directory_dispatch(struct pio_pcap_file_device_context *pfil
FREE(current_file);
FREE(pfile_info);
return -1;
} else {
pfile_dev_ctx->entity.dir->current_file = pfile_info;
res = pcap_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts);
} else {
pfile_dev_ctx->entity.dir->current_file[rxq_id] = pfile_info;
res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts);
if (res < 0) {
FREE(current_file);
return -1;
}
log_info("processed file %s, processed up to %" PRIuMAX,
current_file->file_name, (uintmax_t)timespec_to_millisecond(&current_file->modified_time));
if (compare_timespec(&current_file->modified_time, &last_time_seen) > 0) {
copy_timespec(&current_file->modified_time, &last_time_seen);
}
FREE(current_file);
if (res == 0) { // reach the end of the file
pfile_dev_ctx->entity.dir->current_file = nullptr;
cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]);
FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]);
FREE(current_file);
}
}
} else {
/* file has been opened */
res = pcap_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts);
res = pcap_directory_file_dispatch(pfile_dev_ctx, rxq_id, pkts, nr_pkts);
if (res < 0) {
return -1;
} else if (res == 0) {
cleanup_pcap_plain_file_info(pfile_dev_ctx->entity.dir->current_file[rxq_id]);
FREE(pfile_dev_ctx->entity.dir->current_file[rxq_id]);
FREE(pfile_dev_ctx->entity.dir->pending_file[rxq_id]);
}
}