This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/io_uring.cpp

388 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <string.h>
#include <stdlib.h>
#include "tfe_utils.h"
#include "io_uring.h"
extern void *g_default_logger;
#if (SUPPORT_LIBURING)
#include <liburing.h>
enum entry_type
{
ENTRY_TYPE_UNKNOWN = 0,
ENTRY_TYPE_READ = 1,
ENTRY_TYPE_WRITE = 2,
};
struct user_data
{
enum entry_type type;
struct iovec vec;
};
struct config
{
int ring_size;
int buff_size;
int flags;
int sq_thread_idle; // milliseconds
int enable_debug;
};
struct io_uring_instance
{
int sockfd;
int eventfd;
struct config config;
struct io_uring ring;
struct io_uring_params params;
void *read_cb_args;
read_callback *read_cb;
int read_buff_num;
struct user_data **read_buffs;
};
static const char *entry_type_to_string(enum entry_type type)
{
switch (type)
{
case ENTRY_TYPE_READ:
return "read";
case ENTRY_TYPE_WRITE:
return "write";
case ENTRY_TYPE_UNKNOWN: /* fall passthrough */
default:
return "unknown";
}
}
static void user_data_destory(struct user_data *data)
{
if (data)
{
if (data->vec.iov_base)
{
free(data->vec.iov_base);
data->vec.iov_base = NULL;
}
free(data);
data = NULL;
}
}
static struct user_data *user_data_create(int buff_size)
{
struct user_data *data = (struct user_data *)calloc(1, sizeof(struct user_data));
data->vec.iov_base = (void *)calloc(buff_size, sizeof(char));
data->vec.iov_len = buff_size;
return data;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
if (instance)
{
io_uring_queue_exit(&instance->ring);
if (instance->read_buffs)
{
for (int i = 0; i < instance->read_buff_num; i++)
{
if (instance->read_buffs[i])
{
user_data_destory(instance->read_buffs[i]);
instance->read_buffs[i] = NULL;
}
}
free(instance->read_buffs);
instance->read_buffs = NULL;
}
free(instance);
instance = NULL;
}
}
/*
* ring_size : 1024
* buff_size : 2048
* flags : 0
* IORING_SETUP_IOPOLL(1U << 0) io_context is polled
* IORING_SETUP_SQPOLL(1U << 1) SQ poll thread
* IORING_SETUP_SQ_AFF(1U << 2) sq_thread_cpu is valid
* ORING_SETUP_CQSIZE(1U << 3) app defines CQ size
* IORING_SETUP_CLAMP(1U << 4) clamp SQ/CQ ring sizes
* IORING_SETUP_ATTACH_WQ(1U << 5) attach to existing wq
* IORING_SETUP_R_DISABLED(1U << 6) start with ring disabled
* IORING_SETUP_SUBMIT_ALL(1U << 7) continue submit on error
* sq_thread_idle : 0
*/
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug)
{
int ret;
struct io_uring_instance *instance = (struct io_uring_instance *)calloc(1, sizeof(struct io_uring_instance));
if (instance == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io_uring instance, %s", strerror(errno));
return NULL;
}
instance->sockfd = sockfd;
instance->eventfd = eventfd;
instance->config.ring_size = ring_size;
instance->config.buff_size = buff_size;
instance->config.flags = flags;
instance->config.sq_thread_idle = sq_thread_idle;
instance->config.enable_debug = enable_debug;
instance->read_buff_num = 1;
instance->read_buffs = (struct user_data **)calloc(instance->read_buff_num, sizeof(struct user_data *));
for (int i = 0; i < instance->read_buff_num; i++)
{
instance->read_buffs[i] = user_data_create(instance->config.buff_size);
if (instance->read_buffs[i] == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create user_data, %s", strerror(errno));
goto error_out;
}
}
/*
* 参考资料https://unixism.net/loti/tutorial/sq_poll.html#sq-poll
*
* 执行下面命令进行验证 IORING_SETUP_SQPOLL:
* sudo bpftrace -e 'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\n", comm, pid);}'
*/
if (instance->config.flags)
{
instance->params.flags |= instance->config.flags;
}
if (instance->config.sq_thread_idle)
{
instance->params.sq_thread_idle = instance->config.sq_thread_idle; // milliseconds
}
ret = io_uring_queue_init_params(ring_size, &instance->ring, &instance->params);
if (ret)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to init io_uring queue params, %s", strerror(-ret));
goto error_out;
}
if (instance->eventfd > 0)
{
ret = io_uring_register_eventfd(&instance->ring, instance->eventfd);
if (ret)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to register eventfd for io_uring, %s", strerror(-ret));
goto error_out;
}
}
return instance;
error_out:
io_uring_instance_destory(instance);
return NULL;
}
// return 0 : success
// reutrn -1 : error
int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_data *data)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop read type");
return -1;
}
data->type = ENTRY_TYPE_READ;
io_uring_prep_readv(sqe, instance->sockfd, &data->vec, 1, 0);
io_uring_sqe_set_data(sqe, data);
if (instance->config.enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit read entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, data->vec.iov_base, data->vec.iov_len);
}
io_uring_submit(&instance->ring);
return 0;
}
// return 0 : success
// reutrn -1 : error
int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop write type");
return -1;
}
if (len > instance->config.buff_size)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: the length of the sent data is greater than the capacity of the io_uring buffer");
return -1;
}
struct user_data *user_data = user_data_create(instance->config.buff_size);
user_data->type = ENTRY_TYPE_WRITE;
user_data->vec.iov_len = len;
memcpy(user_data->vec.iov_base, data, len);
io_uring_prep_writev(sqe, instance->sockfd, &user_data->vec, 1, 0);
io_uring_sqe_set_data(sqe, user_data);
if (instance->config.enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit write entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, user_data->vec.iov_base, user_data->vec.iov_len);
}
io_uring_submit(&instance->ring);
return 0;
}
// return 0 : success
// reutrn -1 : error
int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *read_cb_args)
{
if (instance->read_cb)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot register read callback multiple times");
return -1;
}
instance->read_cb = read_cb;
instance->read_cb_args = read_cb_args;
for (int i = 0; i < instance->read_buff_num; i++)
{
struct user_data *data = instance->read_buffs[i];
if (io_uring_submit_read_entry(instance, data) == -1)
{
return -1;
}
}
return 0;
}
// returns the number of processed entrys
int io_uring_peek_ready_entrys(struct io_uring_instance *instance)
{
int ret = 0;
int total = 0;
struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM];
while (1)
{
ret = io_uring_peek_batch_cqe(&instance->ring, cqes, MAX_BATCH_CQE_NUM);
if (ret <= 0)
{
return total;
}
total += ret;
for (int i = 0; i < ret; i++)
{
struct io_uring_cqe *cqe = cqes[i];
if (cqe == NULL || (void *)cqe->user_data == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring cqe, null is return");
continue;
}
struct user_data *user_data = (struct user_data *)cqe->user_data;
if (instance->config.enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d cqe: %p user_data: %p iov_base: %p iovec_len: %04ld cqe->res: %04d",
entry_type_to_string(user_data->type), instance->sockfd, cqe, (void *)user_data, user_data->vec.iov_base, user_data->vec.iov_len, cqe->res);
}
switch (user_data->type)
{
case ENTRY_TYPE_READ:
if (cqe->res > 0)
{
if (instance->read_cb != NULL)
{
instance->read_cb((const char *)user_data->vec.iov_base, cqe->res, instance->read_cb_args);
}
}
else
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)",
entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res));
}
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
io_uring_submit_read_entry(instance, user_data);
break;
case ENTRY_TYPE_WRITE:
if (cqe->res > 0)
{
// data->write_cb
}
else
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)",
entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res));
}
user_data_destory(user_data);
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
break;
default:
user_data_destory(user_data);
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
break;
}
}
}
}
#else
struct io_uring_instance
{
};
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return NULL;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
}
int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *cb_arg)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_peek_ready_entrys(struct io_uring_instance *instance)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return 0;
}
#endif