Add packet utils to packet.h
This commit is contained in:
@@ -57,7 +57,17 @@ static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char
|
||||
|
||||
// push packet to queue
|
||||
struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads];
|
||||
lock_free_queue_push(queue, pcap_pkt);
|
||||
while (lock_free_queue_push(queue, pcap_pkt) == -1)
|
||||
{
|
||||
if (ATOMIC_READ(&handle->io_thread_need_exit))
|
||||
{
|
||||
free(pcap_pkt);
|
||||
PACKET_IO_LOG_STATE("dumpfile io thread need exit");
|
||||
pcap_breakloop(handle->pcap);
|
||||
break;
|
||||
}
|
||||
usleep(1000);
|
||||
}
|
||||
|
||||
if (ATOMIC_READ(&handle->io_thread_need_exit))
|
||||
{
|
||||
@@ -89,7 +99,7 @@ static int dumpfile_handle(const char *file, void *arg)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *dumpfile_thread_cycle(void *arg)
|
||||
static void *dumpfile_thread(void *arg)
|
||||
{
|
||||
struct dumpfile_io *handle = (struct dumpfile_io *)arg;
|
||||
|
||||
@@ -104,7 +114,7 @@ static void *dumpfile_thread_cycle(void *arg)
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
PACKET_IO_LOG_STATE("dumpfile io thread is exiting");
|
||||
PACKET_IO_LOG_STATE("dumpfile io thread exit !!!");
|
||||
ATOMIC_SET(&handle->io_thread_is_runing, 0);
|
||||
|
||||
return NULL;
|
||||
@@ -125,7 +135,7 @@ struct dumpfile_io *dumpfile_io_new(const char *directory, uint8_t nr_threads)
|
||||
}
|
||||
|
||||
handle->nr_threads = nr_threads;
|
||||
strncpy(handle->directory, directory, strlen(directory));
|
||||
strncpy(handle->directory, directory, MIN(strlen(directory), sizeof(handle->directory)));
|
||||
|
||||
for (uint16_t i = 0; i < handle->nr_threads; i++)
|
||||
{
|
||||
@@ -136,7 +146,7 @@ struct dumpfile_io *dumpfile_io_new(const char *directory, uint8_t nr_threads)
|
||||
goto error_out;
|
||||
}
|
||||
}
|
||||
if (pthread_create(&tid, NULL, dumpfile_thread_cycle, (void *)handle) != 0)
|
||||
if (pthread_create(&tid, NULL, dumpfile_thread, (void *)handle) != 0)
|
||||
{
|
||||
PACKET_IO_LOG_ERROR("unable to create packet io thread");
|
||||
goto error_out;
|
||||
@@ -160,8 +170,22 @@ void dumpfile_io_free(struct dumpfile_io *handle)
|
||||
usleep(1000);
|
||||
}
|
||||
|
||||
struct pcap_pkt *pcap_pkt = NULL;
|
||||
for (uint16_t i = 0; i < handle->nr_threads; i++)
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
lock_free_queue_pop(handle->queue[i], (void **)&pcap_pkt);
|
||||
if (pcap_pkt)
|
||||
{
|
||||
free(pcap_pkt);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
lock_free_queue_free(handle->queue[i]);
|
||||
}
|
||||
free(handle);
|
||||
@@ -183,6 +207,7 @@ int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac
|
||||
{
|
||||
struct lock_free_queue *queue = handle->queue[thr_idx];
|
||||
struct pcap_pkt *pcap_pkt = NULL;
|
||||
struct packet *pkt;
|
||||
int nr_parsed = 0;
|
||||
|
||||
for (int i = 0; i < nr_pkts; i++)
|
||||
@@ -200,12 +225,12 @@ int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac
|
||||
ATOMIC_ADD(&handle->stat.raw_rx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.raw_rx_bytes, pcap_pkt->len);
|
||||
|
||||
struct packet *pkt = &pkts[nr_parsed++];
|
||||
pkt = &pkts[nr_parsed];
|
||||
memset(pkt, 0, sizeof(struct packet));
|
||||
packet_parse(pkt, pcap_pkt->data, pcap_pkt->len);
|
||||
packet_set_io_ctx(pkt, pcap_pkt);
|
||||
packet_set_type(pkt, PACKET_TYPE_DATA);
|
||||
packet_set_action(pkt, PACKET_ACTION_FORWARD);
|
||||
packet_set_origin(pkt, PACKET_ORIGIN_DUMPFILE);
|
||||
nr_parsed++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,12 +44,6 @@ void lock_free_queue_free(struct lock_free_queue *queue)
|
||||
return;
|
||||
}
|
||||
|
||||
// wait queue is empty
|
||||
while (queue->head != queue->tail)
|
||||
{
|
||||
usleep(1000);
|
||||
}
|
||||
|
||||
if (queue->queue)
|
||||
{
|
||||
free(queue->queue);
|
||||
@@ -59,19 +53,16 @@ void lock_free_queue_free(struct lock_free_queue *queue)
|
||||
free(queue);
|
||||
}
|
||||
|
||||
void lock_free_queue_push(struct lock_free_queue *queue, void *data)
|
||||
int lock_free_queue_push(struct lock_free_queue *queue, void *data)
|
||||
{
|
||||
uint64_t wait = 1000;
|
||||
retry:
|
||||
if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0)
|
||||
{
|
||||
LOCK_FREE_QUEUE_LOG_ERROR("lock free queue is full, retry later");
|
||||
usleep(wait);
|
||||
wait *= 2;
|
||||
goto retry;
|
||||
return -1;
|
||||
}
|
||||
|
||||
queue->tail = (queue->tail + 1) % queue->size;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void lock_free_queue_pop(struct lock_free_queue *queue, void **data)
|
||||
|
||||
@@ -16,7 +16,7 @@ struct lock_free_queue;
|
||||
struct lock_free_queue *lock_free_queue_new(uint32_t size);
|
||||
void lock_free_queue_free(struct lock_free_queue *queue);
|
||||
|
||||
void lock_free_queue_push(struct lock_free_queue *queue, void *data);
|
||||
int lock_free_queue_push(struct lock_free_queue *queue, void *data);
|
||||
void lock_free_queue_pop(struct lock_free_queue *queue, void **data);
|
||||
|
||||
#ifdef __cpluscplus
|
||||
|
||||
@@ -171,8 +171,16 @@ int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet
|
||||
continue;
|
||||
}
|
||||
|
||||
pkt = &pkts[nr_parsed];
|
||||
memset(pkt, 0, sizeof(struct packet));
|
||||
packet_parse(pkt, data, len);
|
||||
packet_set_io_ctx(pkt, mbuff);
|
||||
packet_set_origin(pkt, PACKET_ORIGIN_MARSIO);
|
||||
nr_parsed++;
|
||||
|
||||
if (marsio_buff_is_ctrlbuf(mbuff))
|
||||
{
|
||||
packet_set_ctrl(pkt);
|
||||
ATOMIC_ADD(&handle->stat.ctrl_rx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.ctrl_rx_bytes, len);
|
||||
}
|
||||
@@ -181,12 +189,6 @@ int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet
|
||||
ATOMIC_ADD(&handle->stat.raw_rx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.raw_rx_bytes, len);
|
||||
}
|
||||
|
||||
pkt = &pkts[nr_parsed];
|
||||
packet_parse(pkt, data, len);
|
||||
packet_set_io_ctx(pkt, mbuff);
|
||||
packet_set_action(pkt, PACKET_ACTION_FORWARD);
|
||||
nr_parsed++;
|
||||
}
|
||||
|
||||
return nr_parsed;
|
||||
|
||||
Reference in New Issue
Block a user