packet IO dumpfile mode use lock free queue
This commit is contained in:
@@ -8,8 +8,7 @@
|
||||
|
||||
struct packet_queue
|
||||
{
|
||||
pthread_mutex_t lock;
|
||||
struct packet **queue;
|
||||
uint64_t *queue;
|
||||
uint32_t size;
|
||||
uint32_t head;
|
||||
uint32_t tail;
|
||||
@@ -24,7 +23,7 @@ struct packet_queue *packet_queue_create(uint32_t size)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
queue->queue = (struct packet **)calloc(size, sizeof(struct packet *));
|
||||
queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t));
|
||||
if (queue->queue == NULL)
|
||||
{
|
||||
PACKET_IO_LOG_ERROR("unable to alloc packet queue buffer");
|
||||
@@ -35,7 +34,6 @@ struct packet_queue *packet_queue_create(uint32_t size)
|
||||
queue->size = size;
|
||||
queue->head = 0;
|
||||
queue->tail = 0;
|
||||
pthread_mutex_init(&queue->lock, NULL);
|
||||
|
||||
return queue;
|
||||
}
|
||||
@@ -64,49 +62,33 @@ void packet_queue_destory(struct packet_queue *queue)
|
||||
queue->queue = NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&queue->lock);
|
||||
|
||||
free(queue);
|
||||
}
|
||||
|
||||
int packet_queue_is_full(struct packet_queue *queue)
|
||||
{
|
||||
return (queue->tail + 1) % queue->size == queue->head;
|
||||
}
|
||||
|
||||
int packet_queue_is_empty(struct packet_queue *queue)
|
||||
{
|
||||
return queue->head == queue->tail;
|
||||
}
|
||||
|
||||
void packet_queue_push(struct packet_queue *queue, struct packet *pkt)
|
||||
{
|
||||
uint64_t wait = 1000;
|
||||
retry:
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
if (packet_queue_is_full(queue))
|
||||
if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, pkt) != 0)
|
||||
{
|
||||
PACKET_IO_LOG_ERROR("packet queue is full, retry later");
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
sleep(1);
|
||||
usleep(wait);
|
||||
wait *= 2;
|
||||
goto retry;
|
||||
}
|
||||
|
||||
queue->queue[queue->tail] = pkt;
|
||||
queue->tail = (queue->tail + 1) % queue->size;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
void packet_queue_pop(struct packet_queue *queue, struct packet **pkt)
|
||||
{
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
if (packet_queue_is_empty(queue))
|
||||
uint64_t read = ATOMIC_READ(&queue->queue[queue->head]);
|
||||
if (read == 0)
|
||||
{
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
*pkt = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
*pkt = queue->queue[queue->head];
|
||||
__sync_val_compare_and_swap(&queue->queue[queue->head], read, 0);
|
||||
*pkt = (struct packet *)read;
|
||||
queue->head = (queue->head + 1) % queue->size;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user