refactor: move packet queue to dumpfile_io.cpp

This commit is contained in:
luwenpeng
2024-08-21 14:33:03 +08:00
parent f2f5441b4c
commit 415c21440f
10 changed files with 139 additions and 168 deletions

View File

@@ -16,7 +16,6 @@
#include "packet_private.h"
#include "packet_parser.h"
#include "packet_dump.h"
#include "lock_free_queue.h"
#define PACKET_IO_LOG_STATE(format, ...) LOG_STATE("dumpfile", format, ##__VA_ARGS__)
#define PACKET_IO_LOG_ERROR(format, ...) LOG_ERROR("dumpfile", format, ##__VA_ARGS__)
@@ -30,7 +29,7 @@ struct dumpfile_io
char dumpfile_path[256];
pcap_t *pcap;
struct lock_free_queue *queue[MAX_THREAD_NUM];
struct packet_queue *queue[MAX_THREAD_NUM];
struct packet_io_stat stat[MAX_THREAD_NUM];
uint64_t io_thread_need_exit;
uint64_t io_thread_is_runing;
@@ -44,10 +43,93 @@ struct pcap_pkt
};
/******************************************************************************
* Private API
* Private API -- queue
******************************************************************************/
static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
struct packet_queue
{
uint64_t *queue;
uint32_t size;
uint32_t head;
uint32_t tail;
};
static struct packet_queue *packet_queue_new(uint32_t size)
{
struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue));
if (queue == NULL)
{
PACKET_IO_LOG_ERROR("unable to new packet queue");
return NULL;
}
queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t));
if (queue->queue == NULL)
{
PACKET_IO_LOG_ERROR("unable to new packet queue");
free(queue);
return NULL;
}
queue->size = size;
queue->head = 0;
queue->tail = 0;
return queue;
}
static void packet_queue_free(struct packet_queue *queue)
{
if (queue == NULL)
{
return;
}
if (queue->queue)
{
free(queue->queue);
queue->queue = NULL;
}
free(queue);
}
static int packet_queue_push(struct packet_queue *queue, void *data)
{
if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0)
{
PACKET_IO_LOG_ERROR("packet queue is full, retry later");
return -1;
}
queue->tail = (queue->tail + 1) % queue->size;
return 0;
}
static void packet_queue_pop(struct packet_queue *queue, void **data)
{
uint64_t read = ATOMIC_READ(&queue->queue[queue->head]);
if (read == 0)
{
*data = NULL;
return;
}
__sync_val_compare_and_swap(&queue->queue[queue->head], read, 0);
*data = (void *)read;
queue->head = (queue->head + 1) % queue->size;
}
static int packet_queue_isempty(struct packet_queue *queue)
{
uint64_t read = ATOMIC_READ(&queue->queue[queue->head]);
return read == 0;
}
/******************************************************************************
* Private API -- utils
******************************************************************************/
static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{
struct dumpfile_io *handle = (struct dumpfile_io *)user;
@@ -69,8 +151,8 @@ static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const
uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING);
// push packet to queue
struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads];
while (lock_free_queue_push(queue, pcap_pkt) == -1)
struct packet_queue *queue = handle->queue[hash % handle->nr_threads];
while (packet_queue_push(queue, pcap_pkt) == -1)
{
if (ATOMIC_READ(&handle->io_thread_need_exit))
{
@@ -103,7 +185,7 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file)
PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
return -1;
}
pcap_loop(handle->pcap, -1, pcap_packet_handler, (u_char *)handle);
pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle);
pcap_close(handle->pcap);
PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path)
@@ -115,7 +197,7 @@ static int all_packet_processed(struct dumpfile_io *handle)
{
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
if (!lock_free_queue_empty(handle->queue[i]))
if (!packet_queue_isempty(handle->queue[i]))
{
return 0;
}
@@ -213,7 +295,7 @@ struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mo
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
handle->queue[i] = lock_free_queue_new(MAX_PACKET_QUEUE_SIZE);
handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE);
if (handle->queue[i] == NULL)
{
PACKET_IO_LOG_ERROR("unable to create packet queue");
@@ -249,7 +331,7 @@ void dumpfile_io_free(struct dumpfile_io *handle)
{
while (1)
{
lock_free_queue_pop(handle->queue[i], (void **)&pcap_pkt);
packet_queue_pop(handle->queue[i], (void **)&pcap_pkt);
if (pcap_pkt)
{
free(pcap_pkt);
@@ -260,14 +342,14 @@ void dumpfile_io_free(struct dumpfile_io *handle)
}
}
lock_free_queue_free(handle->queue[i]);
packet_queue_free(handle->queue[i]);
}
free(handle);
handle = NULL;
}
}
int dumpfile_io_wait_exit(struct dumpfile_io *handle)
int dumpfile_io_isbreak(struct dumpfile_io *handle)
{
return ATOMIC_READ(&handle->io_thread_wait_exit);
}
@@ -277,17 +359,17 @@ int dumpfile_io_init(struct dumpfile_io *handle __attribute__((unused)), uint16_
return 0;
}
int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts)
uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
struct lock_free_queue *queue = handle->queue[thr_idx];
struct packet_queue *queue = handle->queue[thr_idx];
struct packet_io_stat *stat = &handle->stat[thr_idx];
struct pcap_pkt *pcap_pkt = NULL;
struct packet *pkt;
int nr_parsed = 0;
uint16_t nr_parsed = 0;
for (int i = 0; i < nr_pkts; i++)
for (uint16_t i = 0; i < nr_pkts; i++)
{
lock_free_queue_pop(queue, (void **)&pcap_pkt);
packet_queue_pop(queue, (void **)&pcap_pkt);
if (pcap_pkt == NULL)
{
break;
@@ -312,13 +394,13 @@ int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac
return nr_parsed;
}
void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts)
void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
int len;
struct packet *pkt = NULL;
struct packet_io_stat *stat = &handle->stat[thr_idx];
for (int i = 0; i < nr_pkts; i++)
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
len = packet_get_raw_len(pkt);
@@ -338,12 +420,12 @@ void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac
}
}
void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts)
void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
struct packet *pkt = NULL;
struct packet_io_stat *stat = &handle->stat[thr_idx];
for (int i = 0; i < nr_pkts; i++)
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt);
@@ -357,7 +439,7 @@ void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packe
}
}
int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts)
uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
uint16_t len;
struct packet *pkt = NULL;
@@ -368,7 +450,7 @@ int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct pack
char src_addr[INET6_ADDRSTRLEN] = {0};
char dst_addr[INET6_ADDRSTRLEN] = {0};
for (int i = 0; i < nr_pkts; i++)
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
len = packet_get_raw_len(pkt);