diff --git a/src/packet_io/packet_queue.cpp b/src/packet_io/packet_queue.cpp index 1c0a6a0..787a4ab 100644 --- a/src/packet_io/packet_queue.cpp +++ b/src/packet_io/packet_queue.cpp @@ -8,8 +8,7 @@ struct packet_queue { - pthread_mutex_t lock; - struct packet **queue; + uint64_t *queue; uint32_t size; uint32_t head; uint32_t tail; @@ -24,7 +23,7 @@ struct packet_queue *packet_queue_create(uint32_t size) return NULL; } - queue->queue = (struct packet **)calloc(size, sizeof(struct packet *)); + queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); if (queue->queue == NULL) { PACKET_IO_LOG_ERROR("unable to alloc packet queue buffer"); @@ -35,7 +34,6 @@ struct packet_queue *packet_queue_create(uint32_t size) queue->size = size; queue->head = 0; queue->tail = 0; - pthread_mutex_init(&queue->lock, NULL); return queue; } @@ -64,49 +62,33 @@ void packet_queue_destory(struct packet_queue *queue) queue->queue = NULL; } - pthread_mutex_destroy(&queue->lock); - free(queue); } -int packet_queue_is_full(struct packet_queue *queue) -{ - return (queue->tail + 1) % queue->size == queue->head; -} - -int packet_queue_is_empty(struct packet_queue *queue) -{ - return queue->head == queue->tail; -} - void packet_queue_push(struct packet_queue *queue, struct packet *pkt) { + uint64_t wait = 1000; retry: - pthread_mutex_lock(&queue->lock); - if (packet_queue_is_full(queue)) + if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, pkt) != 0) { PACKET_IO_LOG_ERROR("packet queue is full, retry later"); - pthread_mutex_unlock(&queue->lock); - sleep(1); + usleep(wait); + wait *= 2; goto retry; } - queue->queue[queue->tail] = pkt; queue->tail = (queue->tail + 1) % queue->size; - pthread_mutex_unlock(&queue->lock); } void packet_queue_pop(struct packet_queue *queue, struct packet **pkt) { - pthread_mutex_lock(&queue->lock); - if (packet_queue_is_empty(queue)) + uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); + if (read == 0) { - pthread_mutex_unlock(&queue->lock); *pkt = NULL; return; } - - *pkt = queue->queue[queue->head]; + __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); + *pkt = (struct packet *)read; queue->head = (queue->head + 1) % queue->size; - pthread_mutex_unlock(&queue->lock); }