feature: consume all packets and free all sessions before exit

This commit is contained in:
luwenpeng
2024-08-27 16:19:20 +08:00
parent 11bf852c15
commit 79e70f7145
10 changed files with 103 additions and 96 deletions

View File

@@ -35,6 +35,9 @@ struct dumpfile_io
uint64_t io_thread_need_exit;
uint64_t io_thread_is_runing;
uint64_t io_thread_wait_exit;
uint64_t read_pcap_files;
uint64_t read_pcap_pkts;
};
struct pcap_pkt
@@ -121,12 +124,6 @@ static void packet_queue_pop(struct packet_queue *queue, void **data)
queue->head = (queue->head + 1) % queue->size;
}
static int packet_queue_isempty(struct packet_queue *queue)
{
uint64_t read = ATOMIC_READ(&queue->queue[queue->head]);
return read == 0;
}
/******************************************************************************
* Private API -- utils
******************************************************************************/
@@ -146,6 +143,7 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_
pcap_pkt->len = h->caplen;
pcap_pkt->ts = h->ts;
memcpy((char *)pcap_pkt->data, bytes, h->caplen);
ATOMIC_INC(&handle->read_pcap_pkts);
// calculate packet hash
struct packet pkt;
@@ -188,6 +186,7 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file)
PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
return -1;
}
handle->read_pcap_files++;
pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle);
pcap_close(handle->pcap);
@@ -196,16 +195,22 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file)
return 0;
}
static int all_packet_processed(struct dumpfile_io *handle)
static int all_packet_consumed(struct dumpfile_io *handle)
{
uint64_t consumed_pkts = 0;
uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts);
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
if (!packet_queue_isempty(handle->queue[i]))
{
return 0;
}
consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx);
}
if (consumed_pkts == read_pcap_pkts)
{
return 1;
}
else
{
return 0;
}
return 1;
}
static void *dumpfile_thread(void *arg)
@@ -264,7 +269,7 @@ static void *dumpfile_thread(void *arg)
erro_out:
while (ATOMIC_READ(&handle->io_thread_need_exit) == 0)
{
if (all_packet_processed(handle))
if (all_packet_consumed(handle))
{
ATOMIC_SET(&handle->io_thread_wait_exit, 1);
}
@@ -330,6 +335,8 @@ void dumpfile_io_free(struct dumpfile_io *handle)
usleep(1000);
}
PACKET_IO_LOG_FATAL("dumpfile io thread read pcap files %lu, read pcap pkts %lu", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts));
struct pcap_pkt *pcap_pkt = NULL;
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
@@ -380,7 +387,7 @@ uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struc
}
else
{
stat->pkts_rx++;
ATOMIC_INC(&stat->pkts_rx);
stat->bytes_rx += pcap_pkt->len;
stat->raw_pkts_rx++;