#include #include #include #include #include "macro.h" #include "lock_free_queue.h" struct lock_free_queue { uint64_t *queue; uint32_t size; uint32_t head; uint32_t tail; }; struct lock_free_queue *lock_free_queue_new(uint32_t size) { struct lock_free_queue *queue = (struct lock_free_queue *)calloc(1, sizeof(struct lock_free_queue)); if (queue == NULL) { LOCK_FREE_QUEUE_LOG_ERROR("unable to new lock free queue"); return NULL; } queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); if (queue->queue == NULL) { LOCK_FREE_QUEUE_LOG_ERROR("unable to new lock free queue"); free(queue); return NULL; } queue->size = size; queue->head = 0; queue->tail = 0; return queue; } void lock_free_queue_free(struct lock_free_queue *queue) { if (queue == NULL) { return; } if (queue->queue) { free(queue->queue); queue->queue = NULL; } free(queue); } int lock_free_queue_push(struct lock_free_queue *queue, void *data) { if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) { LOCK_FREE_QUEUE_LOG_ERROR("lock free queue is full, retry later"); return -1; } queue->tail = (queue->tail + 1) % queue->size; return 0; } void lock_free_queue_pop(struct lock_free_queue *queue, void **data) { uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); if (read == 0) { *data = NULL; return; } __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); *data = (void *)read; queue->head = (queue->head + 1) % queue->size; }