#include #include #include "tfe_utils.h" #include "io_uring.h" extern void *g_default_logger; #if (SUPPORT_LIBURING) #include enum entry_type { ENTRY_TYPE_UNKNOWN = 0, ENTRY_TYPE_READ = 1, ENTRY_TYPE_WRITE = 2, }; struct user_data { enum entry_type type; struct iovec vec; }; struct config { int ring_size; int buff_size; int flags; int sq_thread_idle; // milliseconds int enable_debug; }; struct io_uring_instance { int sockfd; int eventfd; struct config config; struct io_uring ring; struct io_uring_params params; void *read_cb_args; read_callback *read_cb; int read_buff_num; struct user_data **read_buffs; }; static const char *entry_type_to_string(enum entry_type type) { switch (type) { case ENTRY_TYPE_READ: return "read"; case ENTRY_TYPE_WRITE: return "write"; case ENTRY_TYPE_UNKNOWN: /* fall passthrough */ default: return "unknown"; } } static void user_data_destory(struct user_data *data) { if (data) { free(data); data = NULL; } } static struct user_data *user_data_create(int buff_size) { struct user_data *data = (struct user_data *)calloc(1, sizeof(struct user_data) + buff_size * sizeof(char)); data->vec.iov_base = (void *)data + sizeof(struct user_data); data->vec.iov_len = buff_size; return data; } void io_uring_instance_destory(struct io_uring_instance *instance) { if (instance) { io_uring_queue_exit(&instance->ring); if (instance->read_buffs) { for (int i = 0; i < instance->read_buff_num; i++) { if (instance->read_buffs[i]) { user_data_destory(instance->read_buffs[i]); instance->read_buffs[i] = NULL; } } free(instance->read_buffs); instance->read_buffs = NULL; } free(instance); instance = NULL; } } /* * ring_size : 1024 * buff_size : 2048 * flags : 0 * IORING_SETUP_IOPOLL(1U << 0) io_context is polled * IORING_SETUP_SQPOLL(1U << 1) SQ poll thread * IORING_SETUP_SQ_AFF(1U << 2) sq_thread_cpu is valid * ORING_SETUP_CQSIZE(1U << 3) app defines CQ size * IORING_SETUP_CLAMP(1U << 4) clamp SQ/CQ ring sizes * IORING_SETUP_ATTACH_WQ(1U << 5) attach to existing wq * IORING_SETUP_R_DISABLED(1U << 6) start with ring disabled * IORING_SETUP_SUBMIT_ALL(1U << 7) continue submit on error * sq_thread_idle : 0 */ struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug) { int ret; struct io_uring_instance *instance = (struct io_uring_instance *)calloc(1, sizeof(struct io_uring_instance)); if (instance == NULL) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io_uring instance, %s", strerror(errno)); return NULL; } instance->sockfd = sockfd; instance->eventfd = eventfd; instance->config.ring_size = ring_size; instance->config.buff_size = buff_size; instance->config.flags = flags; instance->config.sq_thread_idle = sq_thread_idle; instance->config.enable_debug = enable_debug; instance->read_buff_num = 1; instance->read_buffs = (struct user_data **)calloc(instance->read_buff_num, sizeof(struct user_data *)); for (int i = 0; i < instance->read_buff_num; i++) { instance->read_buffs[i] = user_data_create(instance->config.buff_size); if (instance->read_buffs[i] == NULL) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create user_data, %s", strerror(errno)); goto error_out; } } /* * 参考资料:https://unixism.net/loti/tutorial/sq_poll.html#sq-poll * * 执行下面命令进行验证 IORING_SETUP_SQPOLL: * sudo bpftrace -e 'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\n", comm, pid);}' */ if (instance->config.flags) { instance->params.flags |= instance->config.flags; } if (instance->config.sq_thread_idle) { instance->params.sq_thread_idle = instance->config.sq_thread_idle; // milliseconds } ret = io_uring_queue_init_params(ring_size, &instance->ring, &instance->params); if (ret) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to init io_uring queue params, %s", strerror(-ret)); goto error_out; } if (instance->eventfd > 0) { ret = io_uring_register_eventfd(&instance->ring, instance->eventfd); if (ret) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to register eventfd for io_uring, %s", strerror(-ret)); goto error_out; } } return instance; error_out: io_uring_instance_destory(instance); return NULL; } // return 0 : success // reutrn -1 : error int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_data *data) { struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring); if (sqe == NULL) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop read type"); return -1; } data->type = ENTRY_TYPE_READ; io_uring_prep_readv(sqe, instance->sockfd, &data->vec, 1, 0); io_uring_sqe_set_data(sqe, data); if (instance->config.enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit read entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld", instance->sockfd, sqe, (void *)sqe->user_data, data->vec.iov_base, data->vec.iov_len); } io_uring_submit(&instance->ring); return 0; } // return 0 : success // reutrn -1 : error int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len) { struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring); if (sqe == NULL) { TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring sqe, io_uring submission queue is full, drop write type"); return -1; } if (len > instance->config.buff_size) { TFE_LOG_ERROR(g_default_logger, "IO_URING: the length of the sent data [%d] is greater than the capacity of the io_uring buffer [%d]", len, instance->config.buff_size); return -1; } struct user_data *user_data = user_data_create(instance->config.buff_size); user_data->type = ENTRY_TYPE_WRITE; user_data->vec.iov_len = len; memcpy(user_data->vec.iov_base, data, len); io_uring_prep_writev(sqe, instance->sockfd, &user_data->vec, 1, 0); io_uring_sqe_set_data(sqe, user_data); if (instance->config.enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit write entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld", instance->sockfd, sqe, (void *)sqe->user_data, user_data->vec.iov_base, user_data->vec.iov_len); } io_uring_submit(&instance->ring); return 0; } // return 0 : success // reutrn -1 : error int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *read_cb_args) { if (instance->read_cb) { TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot register read callback multiple times"); return -1; } instance->read_cb = read_cb; instance->read_cb_args = read_cb_args; for (int i = 0; i < instance->read_buff_num; i++) { struct user_data *data = instance->read_buffs[i]; if (io_uring_submit_read_entry(instance, data) == -1) { return -1; } } return 0; } // returns the number of processed entrys int io_uring_peek_ready_entrys(struct io_uring_instance *instance) { int ret = 0; int total = 0; struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM]; while (1) { ret = io_uring_peek_batch_cqe(&instance->ring, cqes, MAX_BATCH_CQE_NUM); if (ret <= 0) { return total; } total += ret; for (int i = 0; i < ret; i++) { struct io_uring_cqe *cqe = cqes[i]; if (cqe == NULL || (void *)cqe->user_data == NULL) { // TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring cqe, null is return"); continue; } struct user_data *user_data = (struct user_data *)cqe->user_data; if (instance->config.enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d cqe: %p user_data: %p iov_base: %p iovec_len: %04ld cqe->res: %04d", entry_type_to_string(user_data->type), instance->sockfd, cqe, (void *)user_data, user_data->vec.iov_base, user_data->vec.iov_len, cqe->res); } switch (user_data->type) { case ENTRY_TYPE_READ: if (cqe->res > 0) { if (instance->read_cb != NULL) { instance->read_cb((const char *)user_data->vec.iov_base, cqe->res, instance->read_cb_args); } } else { TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)", entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res)); } cqe->user_data = 0; io_uring_cqe_seen(&instance->ring, cqe); io_uring_submit_read_entry(instance, user_data); break; case ENTRY_TYPE_WRITE: if (cqe->res > 0) { // data->write_cb } else { TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)", entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res)); } user_data_destory(user_data); cqe->user_data = 0; io_uring_cqe_seen(&instance->ring, cqe); break; default: user_data_destory(user_data); cqe->user_data = 0; io_uring_cqe_seen(&instance->ring, cqe); break; } } } } #else struct io_uring_instance { }; struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return NULL; } void io_uring_instance_destory(struct io_uring_instance *instance) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); } int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *cb_arg) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return -1; } int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return -1; } int io_uring_peek_ready_entrys(struct io_uring_instance *instance) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return 0; } #endif