perf: 性能优化

* io_uring使用buffer pool避免内存分配与释放
    * packet io thread与worker thread无锁访问cmsg
    * 为解密流量的fd设置默认的TTL
This commit is contained in:
luwenpeng
2023-07-14 19:38:18 +08:00
parent 2b00650d3e
commit c3b887f1c5
19 changed files with 935 additions and 939 deletions

View File

@@ -6,19 +6,35 @@
extern void *g_default_logger;
#if (SUPPORT_LIBURING)
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/uio.h>
#include <liburing.h>
enum entry_type
enum ioevent
{
ENTRY_TYPE_UNKNOWN = 0,
ENTRY_TYPE_READ = 1,
ENTRY_TYPE_WRITE = 2,
IOEVENT_UNKNOWN = 0,
IOEVENT_READ = 1,
IOEVENT_WRITE = 2,
};
struct user_data
struct iobuffer
{
enum entry_type type;
struct iovec vec;
enum ioevent event;
struct iobuffer *next;
};
struct iobuffer_pool
{
int buffer_size;
int buffer_num;
int buffer_used;
int buffer_left;
struct iobuffer *free_list;
};
struct config
@@ -34,74 +50,239 @@ struct io_uring_instance
{
int sockfd;
int eventfd;
struct config config;
struct io_uring ring;
struct io_uring_params params;
void *read_cb_args;
read_callback *read_cb;
int read_buff_num;
struct user_data **read_buffs;
void *cb_args;
io_uring_read_cb *read_cb;
struct iobuffer_pool *pool;
};
static const char *entry_type_to_string(enum entry_type type)
/******************************************************************************
*
******************************************************************************/
static const char *ioevent_tostring(enum ioevent event);
static struct iobuffer *iobuffer_create(int buffer_size);
static void iobuffer_destory(struct iobuffer *buffer);
struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num);
void iobuffer_pool_destory(struct iobuffer_pool *pool);
void iobuffer_pool_print(struct iobuffer_pool *pool);
struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool);
void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer);
struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug);
void io_uring_instance_destory(struct io_uring_instance *instance);
static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer);
int io_uring_write(struct io_uring_instance *instance, const char *data, int len);
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args);
int io_uring_polling(struct io_uring_instance *instance);
/******************************************************************************
*
******************************************************************************/
static const char *ioevent_tostring(enum ioevent event)
{
switch (type)
switch (event)
{
case ENTRY_TYPE_READ:
case IOEVENT_READ:
return "read";
case ENTRY_TYPE_WRITE:
case IOEVENT_WRITE:
return "write";
case ENTRY_TYPE_UNKNOWN: /* fall passthrough */
case IOEVENT_UNKNOWN: /* fall passthrough */
default:
return "unknown";
}
}
static void user_data_destory(struct user_data *data)
static struct iobuffer *iobuffer_create(int buffer_size)
{
if (data)
struct iobuffer *buffer = (struct iobuffer *)calloc(1, sizeof(struct iobuffer));
if (buffer == NULL)
{
free(data);
data = NULL;
goto error_out;
}
}
static struct user_data *user_data_create(int buff_size)
{
struct user_data *data = (struct user_data *)calloc(1, sizeof(struct user_data) + buff_size * sizeof(char));
data->vec.iov_base = (void *)data + sizeof(struct user_data);
data->vec.iov_len = buff_size;
return data;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
if (instance)
buffer->vec.iov_len = buffer_size;
buffer->vec.iov_base = (void *)calloc(buffer->vec.iov_len, sizeof(char));
if (buffer->vec.iov_base == NULL)
{
io_uring_queue_exit(&instance->ring);
goto error_out;
}
buffer->event = IOEVENT_UNKNOWN;
buffer->next = NULL;
if (instance->read_buffs)
return buffer;
error_out:
iobuffer_destory(buffer);
return NULL;
}
static void iobuffer_destory(struct iobuffer *buffer)
{
if (buffer)
{
if (buffer->vec.iov_base)
{
for (int i = 0; i < instance->read_buff_num; i++)
{
if (instance->read_buffs[i])
{
user_data_destory(instance->read_buffs[i]);
instance->read_buffs[i] = NULL;
}
}
free(instance->read_buffs);
instance->read_buffs = NULL;
free(buffer->vec.iov_base);
buffer->vec.iov_base = NULL;
}
free(instance);
instance = NULL;
free(buffer);
buffer = NULL;
}
}
struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num)
{
struct iobuffer *head = NULL;
struct iobuffer *tail = NULL;
struct iobuffer *node = NULL;
struct iobuffer_pool *pool = (struct iobuffer_pool *)calloc(1, sizeof(struct iobuffer_pool));
if (pool == NULL)
{
goto error_out;
}
pool->buffer_size = buffer_size;
pool->buffer_used = 0;
for (int i = 0; i < buffer_num; i++)
{
node = iobuffer_create(pool->buffer_size);
if (node == NULL)
{
goto error_out;
}
if (head == NULL)
{
head = node;
tail = node;
}
else
{
tail->next = node;
tail = node;
}
pool->buffer_left++;
pool->buffer_num++;
}
pool->free_list = head;
return pool;
error_out:
iobuffer_pool_destory(pool);
return NULL;
}
void iobuffer_pool_destory(struct iobuffer_pool *pool)
{
if (pool)
{
struct iobuffer *next = NULL;
struct iobuffer *node = pool->free_list;
while (node)
{
next = node->next;
iobuffer_destory(node);
node = next;
}
free(pool);
pool = NULL;
}
}
void iobuffer_pool_print(struct iobuffer_pool *pool)
{
if (pool)
{
printf(" pool->buffer_size : %d\n", pool->buffer_size);
printf(" pool->buffer_num : %d\n", pool->buffer_num);
printf(" pool->buffer_used : %d\n", pool->buffer_used);
printf(" pool->buffer_left : %d\n", pool->buffer_left);
printf(" pool->free_list : \n");
struct iobuffer *node = pool->free_list;
while (node)
{
printf(" node : %p\n", node);
printf(" node->next : %p\n", node->next);
node = node->next;
}
printf("\n");
}
}
struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool)
{
struct iobuffer *buffer = NULL;
if (pool == NULL)
{
assert(0);
return NULL;
}
if (pool->buffer_left <= 0)
{
buffer = iobuffer_create(pool->buffer_size);
if (buffer)
{
pool->buffer_used++;
return buffer;
}
else
{
return NULL;
}
}
if (pool->free_list == NULL)
{
assert(0);
return NULL;
}
buffer = pool->free_list;
pool->free_list = buffer->next;
buffer->next = NULL;
pool->buffer_used++;
pool->buffer_left--;
return buffer;
}
void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer)
{
if (pool == NULL || buffer == NULL)
{
return;
}
if (pool->buffer_left >= pool->buffer_num)
{
iobuffer_destory(buffer);
pool->buffer_used--;
return;
}
buffer->event = IOEVENT_UNKNOWN;
buffer->next = pool->free_list;
pool->free_list = buffer;
pool->buffer_left++;
pool->buffer_used--;
}
/*
* ring_size : 1024
* buff_size : 2048
@@ -134,16 +315,11 @@ struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int
instance->config.sq_thread_idle = sq_thread_idle;
instance->config.enable_debug = enable_debug;
instance->read_buff_num = 1;
instance->read_buffs = (struct user_data **)calloc(instance->read_buff_num, sizeof(struct user_data *));
for (int i = 0; i < instance->read_buff_num; i++)
instance->pool = iobuffer_pool_create(instance->config.buff_size, instance->config.ring_size);
if (instance->pool == NULL)
{
instance->read_buffs[i] = user_data_create(instance->config.buff_size);
if (instance->read_buffs[i] == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create user_data, %s", strerror(errno));
goto error_out;
}
TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io buffer pool, %s", strerror(errno));
goto error_out;
}
/*
@@ -186,9 +362,20 @@ error_out:
return NULL;
}
void io_uring_instance_destory(struct io_uring_instance *instance)
{
if (instance)
{
io_uring_queue_exit(&instance->ring);
iobuffer_pool_destory(instance->pool);
free(instance);
instance = NULL;
}
}
// return 0 : success
// reutrn -1 : error
int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_data *data)
static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
@@ -197,14 +384,14 @@ int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_d
return -1;
}
data->type = ENTRY_TYPE_READ;
io_uring_prep_readv(sqe, instance->sockfd, &data->vec, 1, 0);
io_uring_sqe_set_data(sqe, data);
buffer->event = IOEVENT_READ;
io_uring_prep_readv(sqe, instance->sockfd, &buffer->vec, 1, 0);
io_uring_sqe_set_data(sqe, buffer);
if (instance->config.enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit read entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, data->vec.iov_base, data->vec.iov_len);
instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len);
}
io_uring_submit(&instance->ring);
@@ -213,7 +400,7 @@ int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_d
// return 0 : success
// reutrn -1 : error
int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len)
int io_uring_write(struct io_uring_instance *instance, const char *data, int len)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring);
if (sqe == NULL)
@@ -228,17 +415,23 @@ int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *
return -1;
}
struct user_data *user_data = user_data_create(instance->config.buff_size);
user_data->type = ENTRY_TYPE_WRITE;
user_data->vec.iov_len = len;
memcpy(user_data->vec.iov_base, data, len);
io_uring_prep_writev(sqe, instance->sockfd, &user_data->vec, 1, 0);
io_uring_sqe_set_data(sqe, user_data);
struct iobuffer *buffer = iobuffer_pool_pop(instance->pool);
if (buffer == NULL)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer");
return -1;
}
buffer->event = IOEVENT_WRITE;
buffer->vec.iov_len = len;
memcpy(buffer->vec.iov_base, data, len);
io_uring_prep_writev(sqe, instance->sockfd, &buffer->vec, 1, 0);
io_uring_sqe_set_data(sqe, buffer);
if (instance->config.enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit write entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld",
instance->sockfd, sqe, (void *)sqe->user_data, user_data->vec.iov_base, user_data->vec.iov_len);
instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len);
}
io_uring_submit(&instance->ring);
@@ -247,7 +440,7 @@ int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *
// return 0 : success
// reutrn -1 : error
int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *read_cb_args)
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args)
{
if (instance->read_cb)
{
@@ -256,30 +449,42 @@ int io_uring_register_read_callback(struct io_uring_instance *instance, read_cal
}
instance->read_cb = read_cb;
instance->read_cb_args = read_cb_args;
instance->cb_args = cb_args;
for (int i = 0; i < instance->read_buff_num; i++)
struct iobuffer *buffer = iobuffer_pool_pop(instance->pool);
if (buffer == NULL)
{
struct user_data *data = instance->read_buffs[i];
if (io_uring_submit_read_entry(instance, data) == -1)
{
return -1;
}
TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer");
return -1;
}
if (io_uring_read(instance, buffer) == -1)
{
iobuffer_pool_push(instance->pool, buffer);
return -1;
}
return 0;
}
// returns the number of processed entrys
int io_uring_peek_ready_entrys(struct io_uring_instance *instance)
int io_uring_polling(struct io_uring_instance *instance)
{
int ret = 0;
int total = 0;
struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM];
struct io_uring_cqe *cqe = NULL;
struct iobuffer *buffer = NULL;
void *cb_args = instance->cb_args;
struct io_uring *ring = &instance->ring;
struct iobuffer_pool *pool = instance->pool;
io_uring_read_cb *read_cb = instance->read_cb;
int enable_debug = instance->config.enable_debug;
while (1)
{
ret = io_uring_peek_batch_cqe(&instance->ring, cqes, MAX_BATCH_CQE_NUM);
ret = io_uring_peek_batch_cqe(ring, cqes, MAX_BATCH_CQE_NUM);
if (ret <= 0)
{
return total;
@@ -288,63 +493,58 @@ int io_uring_peek_ready_entrys(struct io_uring_instance *instance)
total += ret;
for (int i = 0; i < ret; i++)
{
struct io_uring_cqe *cqe = cqes[i];
cqe = cqes[i];
if (cqe == NULL || (void *)cqe->user_data == NULL)
{
// TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring cqe, null is return");
continue;
}
struct user_data *user_data = (struct user_data *)cqe->user_data;
if (instance->config.enable_debug)
buffer = (struct iobuffer *)cqe->user_data;
if (enable_debug)
{
TFE_LOG_DEBUG(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d cqe: %p user_data: %p iov_base: %p iovec_len: %04ld cqe->res: %04d",
entry_type_to_string(user_data->type), instance->sockfd, cqe, (void *)user_data, user_data->vec.iov_base, user_data->vec.iov_len, cqe->res);
ioevent_tostring(buffer->event), instance->sockfd, cqe, (void *)buffer, buffer->vec.iov_base, buffer->vec.iov_len, cqe->res);
}
switch (user_data->type)
switch (buffer->event)
{
case ENTRY_TYPE_READ:
case IOEVENT_READ:
if (cqe->res > 0)
{
if (instance->read_cb != NULL)
if (read_cb)
{
instance->read_cb((const char *)user_data->vec.iov_base, cqe->res, instance->read_cb_args);
read_cb((const char *)buffer->vec.iov_base, cqe->res, cb_args);
}
}
else
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)",
entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res));
}
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
io_uring_submit_read_entry(instance, user_data);
io_uring_cqe_seen(ring, cqe);
io_uring_read(instance, buffer);
break;
case ENTRY_TYPE_WRITE:
if (cqe->res > 0)
{
// data->write_cb
}
else
case IOEVENT_WRITE:
if (cqe->res < 0)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)",
entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res));
ioevent_tostring(buffer->event), instance->sockfd, -cqe->res, strerror(-cqe->res));
}
user_data_destory(user_data);
iobuffer_pool_push(pool, buffer);
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
io_uring_cqe_seen(ring, cqe);
break;
default:
user_data_destory(user_data);
iobuffer_pool_push(pool, buffer);
cqe->user_data = 0;
io_uring_cqe_seen(&instance->ring, cqe);
io_uring_cqe_seen(ring, cqe);
break;
}
}
if (ret < MAX_BATCH_CQE_NUM)
{
return total;
}
}
}
#else
struct io_uring_instance
{
};
@@ -360,23 +560,21 @@ void io_uring_instance_destory(struct io_uring_instance *instance)
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
}
int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *cb_arg)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len)
int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_arg)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_peek_ready_entrys(struct io_uring_instance *instance)
int io_uring_write(struct io_uring_instance *instance, const char *data, int len)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return -1;
}
int io_uring_polling(struct io_uring_instance *instance)
{
TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system");
return 0;
}
#endif