This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/io_uring.cpp
2023-12-29 18:10:46 +08:00

576 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <string.h>
#include <stdlib.h>
#include "tfe_utils.h"
#include "io_uring.h"
extern void *g_default_logger;
#if (SUPPORT_LIBURING)
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/uio.h>
#include <liburing.h>
enum ioevent
{
IOEVENT_UNKNOWN = 0,
IOEVENT_READ = 1,
IOEVENT_WRITE = 2,
};
struct iobuffer
{
struct iovec vec;
enum ioevent event;
struct iobuffer *next;
};
struct iobuffer_pool
{
int buffer_size;
int buffer_num;
int buffer_used;
int buffer_left;
struct iobuffer *free_list;
};
struct io_uring_instance
{
int cfg_ring_size;
int cfg_buff_size;
int cfg_flags;
int cfg_sq_thread_idle; // milliseconds
int cfg_enable_debug;
int sockfd;
int eventfd;
struct io_uring ring;
struct io_uring_params params;
void *cb_args;
io_uring_read_cb *read_cb;
struct iobuffer_pool *pool;
};
/******************************************************************************
*
******************************************************************************/
static const char *ioevent_tostring(enum ioevent event);
static struct iobuffer *iobuffer_create(int buffer_size);
static void iobuffer_destory(struct iobuffer *buffer);
struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num);
void iobuffer_pool_destory(struct iobuffer_pool *pool);
void iobuffer_pool_print(struct iobuffer_pool *pool);
struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool);
void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer);
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug);
void io_uring_instance_destory(struct io_uring_instance *instance);
static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer);
int io_uring_write(struct io_uring_instance *instance, const char *data, int len);
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args);
int io_uring_polling(struct io_uring_instance *instance);
/******************************************************************************
*
******************************************************************************/
static const char *ioevent_tostring(enum ioevent event)
{
switch (event)
{
case IOEVENT_READ:
return "read";
case IOEVENT_WRITE:
return "write";
case IOEVENT_UNKNOWN: /* fall passthrough */
default:
return "unknown";
}
}
static struct iobuffer *iobuffer_create(int buffer_size)
{
struct iobuffer *buffer = (struct iobuffer *)calloc(1, sizeof(struct iobuffer));
if (buffer == NULL)
{
goto error_out;
}
buffer->vec.iov_len = buffer_size;
buffer->vec.iov_base = (void *)calloc(buffer->vec.iov_len, sizeof(char));
if (buffer->vec.iov_base == NULL)
{
goto error_out;
}
buffer->event = IOEVENT_UNKNOWN;
buffer->next = NULL;
return buffer;
error_out:
iobuffer_destory(buffer);
return NULL;
}
static void iobuffer_destory(struct iobuffer *buffer)
{
if (buffer)
{
if (buffer->vec.iov_base)
{
free(buffer->vec.iov_base);
buffer->vec.iov_base = NULL;
}
free(buffer);
buffer = NULL;
}
}
struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num)
{
struct iobuffer *head = NULL;
struct iobuffer *tail = NULL;
struct iobuffer *node = NULL;
struct iobuffer_pool *pool = (struct iobuffer_pool *)calloc(1, sizeof(struct iobuffer_pool));
if (pool == NULL)
{
goto error_out;
}
pool->buffer_size = buffer_size;
pool->buffer_used = 0;
for (int i = 0; i < buffer_num; i++)
{
node = iobuffer_create(pool->buffer_size);
if (node == NULL)
{
goto error_out;
}
if (head == NULL)
{
head = node;
tail = node;
}
else
{
tail->next = node;
tail = node;
}
pool->buffer_left++;
pool->buffer_num++;
}
pool->free_list = head;
return pool;
error_out:
iobuffer_pool_destory(pool);
return NULL;
}
void iobuffer_pool_destory(struct iobuffer_pool *pool)
{
if (pool)
{
struct iobuffer *next = NULL;
struct iobuffer *node = pool->free_list;
while (node)
{
next = node->next;
iobuffer_destory(node);
node = next;
}
free(pool);
pool = NULL;
}
}
void iobuffer_pool_print(struct iobuffer_pool *pool)
{
if (pool)
{
printf(" pool->buffer_size : %d\n", pool->buffer_size);
printf(" pool->buffer_num : %d\n", pool->buffer_num);
printf(" pool->buffer_used : %d\n", pool->buffer_used);
printf(" pool->buffer_left : %d\n", pool->buffer_left);
printf(" pool->free_list : \n");
struct iobuffer *node = pool->free_list;
while (node)
{
printf(" node : %p\n", node);
printf(" node->next : %p\n", node->next);
node = node->next;
}
printf("\n");
}
}
struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool)
{
struct iobuffer *buffer = NULL;
if (pool == NULL)
{
assert(0);
return NULL;
}
if (pool->buffer_left <= 0)
{
buffer = iobuffer_create(pool->buffer_size);
if (buffer)
{
pool->buffer_used++;
return buffer;
}
else
{
return NULL;
}
}
if (pool->free_list == NULL)
{
assert(0);
return NULL;
}
buffer = pool->free_list;
pool->free_list = buffer->next;
buffer->next = NULL;
pool->buffer_used++;
pool->buffer_left--;
return buffer;
}
void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer)
{
if (pool == NULL || buffer == NULL)
{
return;
}
if (pool->buffer_left >= pool->buffer_num)
{
iobuffer_destory(buffer);
pool->buffer_used--;
return;
}
buffer->event = IOEVENT_UNKNOWN;
buffer->next = pool->free_list;
pool->free_list = buffer;
pool->buffer_left++;
pool->buffer_used--;
}
/*
* ring_size : 1024
* buff_size : 2048
* flags : 0
* IORING_SETUP_IOPOLL(1U << 0) io_context is polled
* IORING_SETUP_SQPOLL(1U << 1) SQ poll thread
* IORING_SETUP_SQ_AFF(1U << 2) sq_thread_cpu is valid
* ORING_SETUP_CQSIZE(1U << 3) app defines CQ size
* IORING_SETUP_CLAMP(1U << 4) clamp SQ/CQ ring sizes
* IORING_SETUP_ATTACH_WQ(1U << 5) attach to existing wq
* IORING_SETUP_R_DISABLED(1U << 6) start with ring disabled
* IORING_SETUP_SUBMIT_ALL(1U << 7) continue submit on error
* sq_thread_idle : 0
*/
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug)
{
int ret;
struct io_uring_instance *instance = (struct io_uring_instance *)calloc(1, sizeof(struct io_uring_instance));
if (instance == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io_uring instance, %s", strerror(errno));
return NULL;
}
instance->sockfd = sockfd;
instance->eventfd = eventfd;
instance->cfg_ring_size = ring_size;
instance->cfg_buff_size = buff_size;
instance->cfg_flags = flags;
instance->cfg_sq_thread_idle = sq_thread_idle;
instance->cfg_enable_debug = enable_debug;
instance->pool = iobuffer_pool_create(instance->cfg_buff_size, instance->cfg_ring_size);
if (instance->pool == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io buffer pool, %s", strerror(errno));
goto error_out;
}
/*
* 参考资料https://unixism.net/loti/tutorial/sq_poll.html#sq-poll
*
* 执行下面命令进行验证 IORING_SETUP_SQPOLL:
* sudo bpftrace -e 'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\n", comm, pid);}'
*/
if (instance->cfg_flags)
{
instance->params.flags |= instance->cfg_flags;
}
if (instance->cfg_sq_thread_idle)
{
instance->params.sq_thread_idle = instance->cfg_sq_thread_idle; // milliseconds
}
ret = io_uring_queue_init_params(ring_size, &instance->ring, &instance->params);
if (ret)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to init io_uring queue params, %s", strerror(-ret));
goto error_out;
}
if (instance->eventfd > 0)
{
ret = io_uring_register_eventfd(&instance->ring, instance->eventfd);
if (ret)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to register eventfd for io_uring, %s", strerror(-ret));
goto error_out;
}
}
return instance;
error_out:
io_uring_instance_destory(instance);
return NULL;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
if (instance)
{
io_uring_queue_exit(&instance->ring);
iobuffer_pool_destory(instance->pool);
free(instance);
instance = NULL;
}
}
// return 0 : success
// reutrn -1 : error
static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop read type");
return -1;
}
buffer->event = IOEVENT_READ;
io_uring_prep_readv(sqe, instance->sockfd, &buffer->vec, 1, 0);
io_uring_sqe_set_data(sqe, buffer);
if (unlikely(instance->cfg_enable_debug))
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit read entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len);
}
io_uring_submit(&instance->ring);
return 0;
}
// return 0 : success
// reutrn -1 : error
int io_uring_write(struct io_uring_instance *instance, const char *data, int len)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop write type");
return -1;
}
if (len > instance->cfg_buff_size)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: the length of the sent data [%d] is greater than the capacity of the io_uring buffer [%d]", len, instance->cfg_buff_size);
return -1;
}
struct iobuffer *buffer = iobuffer_pool_pop(instance->pool);
if (buffer == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer");
return -1;
}
buffer->event = IOEVENT_WRITE;
buffer->vec.iov_len = len;
memcpy(buffer->vec.iov_base, data, len);
io_uring_prep_writev(sqe, instance->sockfd, &buffer->vec, 1, 0);
io_uring_sqe_set_data(sqe, buffer);
if (unlikely(instance->cfg_enable_debug))
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit write entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len);
}
io_uring_submit(&instance->ring);
return 0;
}
// return 0 : success
// reutrn -1 : error
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args)
{
if (instance->read_cb)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot register read callback multiple times");
return -1;
}
instance->read_cb = read_cb;
instance->cb_args = cb_args;
struct iobuffer *buffer = iobuffer_pool_pop(instance->pool);
if (buffer == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer");
return -1;
}
if (io_uring_read(instance, buffer) == -1)
{
iobuffer_pool_push(instance->pool, buffer);
return -1;
}
return 0;
}
// returns the number of processed entrys
int io_uring_polling(struct io_uring_instance *instance)
{
int ret = 0;
int total = 0;
struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM];
struct io_uring_cqe *cqe = NULL;
struct iobuffer *buffer = NULL;
void *cb_args = instance->cb_args;
struct io_uring *ring = &instance->ring;
struct iobuffer_pool *pool = instance->pool;
io_uring_read_cb *read_cb = instance->read_cb;
int enable_debug = instance->cfg_enable_debug;
while (1)
{
ret = io_uring_peek_batch_cqe(ring, cqes, MAX_BATCH_CQE_NUM);
if (ret <= 0)
{
return total;
}
total += ret;
for (int i = 0; i < ret; i++)
{
cqe = cqes[i];
if (cqe == NULL || (void *)cqe->user_data == NULL)
{
// TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring cqe, null is return");
continue;
}
buffer = (struct iobuffer *)cqe->user_data;
if (unlikely(enable_debug))
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d cqe: %p user_data: %p iov_base: %p iovec_len: %04ld cqe->res: %04d",
ioevent_tostring(buffer->event), instance->sockfd, cqe, (void *)buffer, buffer->vec.iov_base, buffer->vec.iov_len, cqe->res);
}
switch (buffer->event)
{
case IOEVENT_READ:
if (cqe->res > 0)
{
if (read_cb)
{
read_cb((const char *)buffer->vec.iov_base, cqe->res, cb_args);
}
}
cqe->user_data = 0;
io_uring_cqe_seen(ring, cqe);
io_uring_read(instance, buffer);
break;
case IOEVENT_WRITE:
if (cqe->res < 0)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)",
ioevent_tostring(buffer->event), instance->sockfd, -cqe->res, strerror(-cqe->res));
}
iobuffer_pool_push(pool, buffer);
cqe->user_data = 0;
io_uring_cqe_seen(ring, cqe);
break;
default:
iobuffer_pool_push(pool, buffer);
cqe->user_data = 0;
io_uring_cqe_seen(ring, cqe);
break;
}
}
if (ret < MAX_BATCH_CQE_NUM)
{
return total;
}
}
}
#else
struct io_uring_instance
{
};
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return NULL;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
}
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_arg)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_write(struct io_uring_instance *instance, const char *data, int len)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_polling(struct io_uring_instance *instance)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return 0;
}
#endif