TSG-10708 KNI使用io_uring替换read/write以实现I/O加速
* support iouring: 按需动态分配内存
This commit is contained in:
@@ -30,6 +30,7 @@ stages:
|
|||||||
- dnf --enablerepo=powertools install -y clang
|
- dnf --enablerepo=powertools install -y clang
|
||||||
- dnf --enablerepo=powertools install -y llvm
|
- dnf --enablerepo=powertools install -y llvm
|
||||||
- dnf --enablerepo=powertools install -y libbpf-devel
|
- dnf --enablerepo=powertools install -y libbpf-devel
|
||||||
|
- dnf --enablerepo=powertools install -y liburing-devel
|
||||||
- chmod +x ./ci/travis.sh
|
- chmod +x ./ci/travis.sh
|
||||||
script:
|
script:
|
||||||
- yum makecache
|
- yum makecache
|
||||||
|
|||||||
@@ -28,6 +28,14 @@ elseif(ASAN_OPTION MATCHES "THREAD")
|
|||||||
endif()
|
endif()
|
||||||
# end of for ASAN
|
# end of for ASAN
|
||||||
|
|
||||||
|
find_package(LIBURING)
|
||||||
|
if (SUPPORT_LIBURING)
|
||||||
|
add_definitions(-DSUPPORT_LIBURING=1)
|
||||||
|
message(STATUS "Support liburing")
|
||||||
|
else()
|
||||||
|
message(STATUS "Not support liburing")
|
||||||
|
endif()
|
||||||
|
|
||||||
find_package(LIBBPF)
|
find_package(LIBBPF)
|
||||||
if (SUPPORT_BPF)
|
if (SUPPORT_BPF)
|
||||||
add_definitions(-DSUPPORT_BPF=1)
|
add_definitions(-DSUPPORT_BPF=1)
|
||||||
|
|||||||
8
cmake/FindLIBURING.cmake
Normal file
8
cmake/FindLIBURING.cmake
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
include(FindPkgConfig)
|
||||||
|
pkg_check_modules(LIBURING liburing)
|
||||||
|
|
||||||
|
if(LIBURING_FOUND)
|
||||||
|
set(SUPPORT_LIBURING true)
|
||||||
|
else()
|
||||||
|
set(SUPPORT_LIBURING false)
|
||||||
|
endif()
|
||||||
@@ -22,6 +22,23 @@ bpf_debug_log=0
|
|||||||
# 4: BPF 使用四元组分流
|
# 4: BPF 使用四元组分流
|
||||||
bpf_hash_mode=2
|
bpf_hash_mode=2
|
||||||
|
|
||||||
|
[io_uring]
|
||||||
|
enable_iouring=1
|
||||||
|
enable_debuglog=0
|
||||||
|
ring_size=1024
|
||||||
|
buff_size=2048
|
||||||
|
# io_uring_setup() flags
|
||||||
|
# IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
|
||||||
|
# IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
|
||||||
|
# IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
|
||||||
|
# IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
|
||||||
|
# IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */
|
||||||
|
# IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */
|
||||||
|
# IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */
|
||||||
|
# IORING_SETUP_SUBMIT_ALL (1U << 7) /* continue submit on error */
|
||||||
|
flags=0
|
||||||
|
sq_thread_idle=0
|
||||||
|
|
||||||
[tfe0]
|
[tfe0]
|
||||||
enabled = 1
|
enabled = 1
|
||||||
dev_eth_symbol = ens1f5
|
dev_eth_symbol = ens1f5
|
||||||
|
|||||||
@@ -1,11 +1,15 @@
|
|||||||
set(CMAKE_INSTALL_PREFIX /home/mesasoft/sapp_run)
|
set(CMAKE_INSTALL_PREFIX /home/mesasoft/sapp_run)
|
||||||
add_library(kni SHARED src/kni_entry.cpp src/tfe_mgr.cpp src/kni_tap_rss.cpp src/kni_pxy_tcp_option.cpp src/kni_dynamic_bypass.cpp)
|
add_library(kni SHARED src/kni_entry.cpp src/tfe_mgr.cpp src/kni_tap_rss.cpp src/kni_iouring.cpp src/kni_pxy_tcp_option.cpp src/kni_dynamic_bypass.cpp)
|
||||||
target_include_directories(kni PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
|
target_include_directories(kni PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
|
||||||
|
|
||||||
|
target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe mrzcpd dabloom)
|
||||||
|
|
||||||
if (SUPPORT_BPF)
|
if (SUPPORT_BPF)
|
||||||
target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe mrzcpd dabloom bpf)
|
target_link_libraries(kni bpf)
|
||||||
else()
|
endif()
|
||||||
target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe mrzcpd dabloom)
|
|
||||||
|
if (SUPPORT_LIBURING)
|
||||||
|
target_link_libraries(kni uring)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
install(TARGETS kni LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/plug/business/kni COMPONENT LIBRARIES)
|
install(TARGETS kni LIBRARY DESTINATION ${CMAKE_INSTALL_PREFIX}/plug/business/kni COMPONENT LIBRARIES)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
#include <tsg/tsg_statistic.h>
|
#include <tsg/tsg_statistic.h>
|
||||||
#include "tfe_mgr.h"
|
#include "tfe_mgr.h"
|
||||||
#include <tsg/tsg_label.h>
|
#include <tsg/tsg_label.h>
|
||||||
|
#include "kni_iouring.h"
|
||||||
|
|
||||||
#define BURST_MAX 1
|
#define BURST_MAX 1
|
||||||
#define CALLER_SAPP 0
|
#define CALLER_SAPP 0
|
||||||
@@ -196,6 +197,9 @@ struct per_thread_handle{
|
|||||||
MESA_htable_handle tuple2stream_htable;
|
MESA_htable_handle tuple2stream_htable;
|
||||||
MESA_htable_handle traceid2sslinfo_htable;
|
MESA_htable_handle traceid2sslinfo_htable;
|
||||||
struct expiry_dablooms_handle *dabloom_handle;
|
struct expiry_dablooms_handle *dabloom_handle;
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
struct io_uring_handle *iouring_handle;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tuple2stream_htable_value{
|
struct tuple2stream_htable_value{
|
||||||
@@ -211,6 +215,7 @@ struct security_policy_shunt_tsg_diagnose{
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct kni_handle{
|
struct kni_handle{
|
||||||
|
struct io_uring_conf iouring_conf;
|
||||||
struct kni_marsio_handle *marsio_handle;
|
struct kni_marsio_handle *marsio_handle;
|
||||||
struct bpf_ctx *tap_bpf_ctx;
|
struct bpf_ctx *tap_bpf_ctx;
|
||||||
struct kni_maat_handle *maat_handle;
|
struct kni_maat_handle *maat_handle;
|
||||||
|
|||||||
69
entry/include/kni_iouring.h
Normal file
69
entry/include/kni_iouring.h
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
#ifndef _KNI_IOURING_H_
|
||||||
|
#define _KNI_IOURING_H_
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
#include <liburing.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C"
|
||||||
|
{
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define MAX_BATCH_CQE_NUM 128
|
||||||
|
|
||||||
|
struct io_uring_conf
|
||||||
|
{
|
||||||
|
int enable_iouring;
|
||||||
|
int enable_debuglog;
|
||||||
|
|
||||||
|
int ring_size;
|
||||||
|
int buff_size;
|
||||||
|
|
||||||
|
int flags;
|
||||||
|
int sq_thread_idle; // milliseconds
|
||||||
|
};
|
||||||
|
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
enum evtype
|
||||||
|
{
|
||||||
|
EVTYPE_UNKNOWN = 0,
|
||||||
|
EVTYPE_READ = 1,
|
||||||
|
EVTYPE_WRITE = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct user_data
|
||||||
|
{
|
||||||
|
int sockfd;
|
||||||
|
enum evtype type;
|
||||||
|
struct iovec vec;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct io_uring_handle
|
||||||
|
{
|
||||||
|
struct io_uring ring;
|
||||||
|
struct io_uring_params params;
|
||||||
|
|
||||||
|
int ring_size;
|
||||||
|
int buff_size;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct user_data *io_uring_user_data_create(int sockfd, enum evtype type, int buff_size);
|
||||||
|
void io_uring_user_data_destory(struct user_data *conn);
|
||||||
|
|
||||||
|
struct io_uring_handle *io_uring_handle_create(int ring_size, int buff_size, int flags, int sq_thread_idle);
|
||||||
|
void io_uring_handle_destory(struct io_uring_handle *handle);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
||||||
@@ -6,7 +6,7 @@ extern "C"
|
|||||||
{
|
{
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TAP_RSS_LOG_TAG "TAP_RSS :"
|
#define TAP_RSS_LOG_TAG "TAP_RSS: "
|
||||||
|
|
||||||
struct bpf_ctx;
|
struct bpf_ctx;
|
||||||
|
|
||||||
|
|||||||
@@ -64,6 +64,9 @@ struct kni_handle *g_kni_handle = NULL;
|
|||||||
struct kni_field_stat_handle *g_kni_fs_handle = NULL;
|
struct kni_field_stat_handle *g_kni_fs_handle = NULL;
|
||||||
int *arr_last_tfe_dispatch_index = NULL;
|
int *arr_last_tfe_dispatch_index = NULL;
|
||||||
|
|
||||||
|
static int io_uring_add_write(int thread_seq, char *raw_data, uint16_t raw_len, addr_type_t addr_type);
|
||||||
|
static int io_uring_add_read(int thread_seq);
|
||||||
|
static int io_uring_polling(int thread_seq);
|
||||||
|
|
||||||
static char* stream_errmsg_session_record(enum intercept_error _errno){
|
static char* stream_errmsg_session_record(enum intercept_error _errno){
|
||||||
switch(_errno){
|
switch(_errno){
|
||||||
@@ -808,14 +811,23 @@ static int send_to_tfe_normal_mode(char *raw_data, uint16_t raw_len, int thread_
|
|||||||
|
|
||||||
static int send_to_tfe_tap_mode(char *raw_data, uint16_t raw_len, int thread_seq, addr_type_t addr_type, void *logger)
|
static int send_to_tfe_tap_mode(char *raw_data, uint16_t raw_len, int thread_seq, addr_type_t addr_type, void *logger)
|
||||||
{
|
{
|
||||||
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
if (g_kni_handle->iouring_conf.enable_iouring)
|
||||||
char *send_buff = g_kni_handle->threads_handle[thread_seq].buff;
|
|
||||||
|
|
||||||
add_ether_header(send_buff, raw_data, raw_len, addr_type);
|
|
||||||
int ret = kni_tap_write_per_thread(tap_fd, send_buff, raw_len + sizeof(struct ethhdr), logger);
|
|
||||||
if (ret < 0)
|
|
||||||
{
|
{
|
||||||
return -1;
|
if (io_uring_add_write(thread_seq, raw_data, raw_len, addr_type) == -1)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
||||||
|
char *send_buff = g_kni_handle->threads_handle[thread_seq].buff;
|
||||||
|
add_ether_header(send_buff, raw_data, raw_len, addr_type);
|
||||||
|
int ret = kni_tap_write_per_thread(tap_fd, send_buff, raw_len + sizeof(struct ethhdr), logger);
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@@ -1978,21 +1990,35 @@ extern "C" char kni_polling_all_entry(const struct streaminfo *stream, void** pm
|
|||||||
else if (g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_TAP)
|
else if (g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_TAP)
|
||||||
{
|
{
|
||||||
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
||||||
char *recv_buff = g_kni_handle->threads_handle[thread_seq].buff;
|
if (g_kni_handle->iouring_conf.enable_iouring)
|
||||||
int recv_buff_size = g_kni_handle->threads_handle[thread_seq].buff_size;
|
|
||||||
|
|
||||||
// 只支持一个 TFE, 每次最多收 100 个包
|
|
||||||
for (int j = 0; j < 100; j++)
|
|
||||||
{
|
{
|
||||||
if (kni_tap_read_per_thread(tap_fd, recv_buff, recv_buff_size, logger) > 0)
|
if (io_uring_polling(thread_seq) == -1)
|
||||||
{
|
{
|
||||||
struct ethhdr *ether_hdr = (struct ethhdr *)recv_buff;
|
flag = POLLING_STATE_IDLE;
|
||||||
tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq);
|
|
||||||
flag = POLLING_STATE_WORK;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
break;
|
flag = POLLING_STATE_WORK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
char *recv_buff = g_kni_handle->threads_handle[thread_seq].buff;
|
||||||
|
int recv_buff_size = g_kni_handle->threads_handle[thread_seq].buff_size;
|
||||||
|
|
||||||
|
// 只支持一个 TFE, 每次最多收 100 个包
|
||||||
|
for (int j = 0; j < 100; j++)
|
||||||
|
{
|
||||||
|
if (kni_tap_read_per_thread(tap_fd, recv_buff, recv_buff_size, logger) > 0)
|
||||||
|
{
|
||||||
|
struct ethhdr *ether_hdr = (struct ethhdr *)recv_buff;
|
||||||
|
tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq);
|
||||||
|
flag = POLLING_STATE_WORK;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2669,6 +2695,13 @@ extern "C" int kni_init(){
|
|||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "enable_iouring", (int *)&g_kni_handle->iouring_conf.enable_iouring);
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "enable_debuglog", (int *)&g_kni_handle->iouring_conf.enable_debuglog);
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "ring_size", (int *)&g_kni_handle->iouring_conf.ring_size);
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "buff_size", (int *)&g_kni_handle->iouring_conf.buff_size);
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "flags", (int *)&g_kni_handle->iouring_conf.flags);
|
||||||
|
MESA_load_profile_int_nodef(profile, "io_uring", "sq_thread_idle", (int *)&g_kni_handle->iouring_conf.sq_thread_idle);
|
||||||
|
|
||||||
int tap_allow_mutilthread = 0;
|
int tap_allow_mutilthread = 0;
|
||||||
MESA_load_profile_int_nodef(profile, "tap", "tap_allow_mutilthread", &tap_allow_mutilthread);
|
MESA_load_profile_int_nodef(profile, "tap", "tap_allow_mutilthread", &tap_allow_mutilthread);
|
||||||
if (tap_allow_mutilthread)
|
if (tap_allow_mutilthread)
|
||||||
@@ -2677,6 +2710,7 @@ extern "C" int kni_init(){
|
|||||||
uint32_t bpf_hash_mode = 2;
|
uint32_t bpf_hash_mode = 2;
|
||||||
uint32_t bpf_queue_num = get_thread_count();
|
uint32_t bpf_queue_num = get_thread_count();
|
||||||
char bpf_obj[1024] = {0};
|
char bpf_obj[1024] = {0};
|
||||||
|
|
||||||
MESA_load_profile_int_nodef(profile, "tap", "bpf_debug_log", (int *)&bpf_debug_log);
|
MESA_load_profile_int_nodef(profile, "tap", "bpf_debug_log", (int *)&bpf_debug_log);
|
||||||
MESA_load_profile_int_nodef(profile, "tap", "bpf_hash_mode", (int *)&bpf_hash_mode);
|
MESA_load_profile_int_nodef(profile, "tap", "bpf_hash_mode", (int *)&bpf_hash_mode);
|
||||||
ret = MESA_load_profile_string_nodef(profile, "tap", "bpf_obj", bpf_obj, sizeof(bpf_obj));
|
ret = MESA_load_profile_string_nodef(profile, "tap", "bpf_obj", bpf_obj, sizeof(bpf_obj));
|
||||||
@@ -2740,8 +2774,29 @@ extern "C" int kni_init(){
|
|||||||
{
|
{
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
g_kni_handle->threads_handle[i].buff_size = KNI_MTU;
|
if (g_kni_handle->iouring_conf.enable_iouring)
|
||||||
g_kni_handle->threads_handle[i].buff = ALLOC(char, g_kni_handle->threads_handle[i].buff_size);
|
{
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
g_kni_handle->threads_handle[i].iouring_handle = io_uring_handle_create(
|
||||||
|
g_kni_handle->iouring_conf.ring_size, g_kni_handle->iouring_conf.buff_size,
|
||||||
|
g_kni_handle->iouring_conf.flags, g_kni_handle->iouring_conf.sq_thread_idle);
|
||||||
|
if (g_kni_handle->threads_handle[i].iouring_handle == NULL)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(local_logger, TAP_RSS_LOG_TAG "Failed at kni create iouring");
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
|
|
||||||
|
io_uring_add_read(i);
|
||||||
|
#else
|
||||||
|
KNI_LOG_ERROR(local_logger, TAP_RSS_LOG_TAG "Unsupport io_uring !!!");
|
||||||
|
exit(-1);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
g_kni_handle->threads_handle[i].buff_size = KNI_MTU;
|
||||||
|
g_kni_handle->threads_handle[i].buff = ALLOC(char, g_kni_handle->threads_handle[i].buff_size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//init ssl dynamic bypass htable
|
//init ssl dynamic bypass htable
|
||||||
@@ -2819,3 +2874,159 @@ error_out:
|
|||||||
kni_destroy();
|
kni_destroy();
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
|
||||||
|
static int io_uring_add_write(int thread_seq, char *raw_data, uint16_t raw_len, addr_type_t addr_type)
|
||||||
|
{
|
||||||
|
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
||||||
|
struct io_uring_handle *handle = g_kni_handle->threads_handle[thread_seq].iouring_handle;
|
||||||
|
struct io_uring *ring = &handle->ring;
|
||||||
|
|
||||||
|
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||||
|
if (sqe == NULL)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni the io_uring submission queue is full, drop write packet");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct user_data *conn = io_uring_user_data_create(tap_fd, EVTYPE_WRITE, handle->buff_size);
|
||||||
|
conn->vec.iov_len = sizeof(struct ethhdr) + raw_len;
|
||||||
|
add_ether_header(conn->vec.iov_base, raw_data, raw_len, addr_type);
|
||||||
|
|
||||||
|
io_uring_prep_writev(sqe, conn->sockfd, &conn->vec, 1, 0);
|
||||||
|
io_uring_sqe_set_data(sqe, conn);
|
||||||
|
|
||||||
|
if (g_kni_handle->iouring_conf.enable_debuglog)
|
||||||
|
{
|
||||||
|
KNI_LOG_DEBUG(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Add event: write, thread: %02d, sockfd: %02d, sqe: %p, user_data: %p, iov_base: %p, iovec_len: %04d",
|
||||||
|
thread_seq, conn->sockfd, sqe, sqe->user_data, conn->vec.iov_base, conn->vec.iov_len);
|
||||||
|
}
|
||||||
|
io_uring_submit(ring);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int io_uring_add_read(int thread_seq)
|
||||||
|
{
|
||||||
|
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
||||||
|
struct io_uring_handle *handle = g_kni_handle->threads_handle[thread_seq].iouring_handle;
|
||||||
|
struct io_uring *ring = &handle->ring;
|
||||||
|
|
||||||
|
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||||
|
if (sqe == NULL)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni the io_uring submission queue is full, drop read packet");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct user_data *conn = io_uring_user_data_create(tap_fd, EVTYPE_READ, handle->buff_size);
|
||||||
|
io_uring_prep_readv(sqe, conn->sockfd, &conn->vec, 1, 0);
|
||||||
|
io_uring_sqe_set_data(sqe, conn);
|
||||||
|
|
||||||
|
if (g_kni_handle->iouring_conf.enable_debuglog)
|
||||||
|
{
|
||||||
|
KNI_LOG_DEBUG(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Add event: read, thread: %02d, sockfd: %02d, sqe: %p, user_data: %p, iov_base: %p, iovec_len: %04d",
|
||||||
|
thread_seq, conn->sockfd, sqe, sqe->user_data, conn->vec.iov_base, conn->vec.iov_len);
|
||||||
|
}
|
||||||
|
io_uring_submit(ring);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int io_uring_polling(int thread_seq)
|
||||||
|
{
|
||||||
|
int tap_fd = g_kni_handle->threads_handle[thread_seq].tap_fd;
|
||||||
|
MESA_htable_handle tuple2stream_htable = g_kni_handle->threads_handle[thread_seq].tuple2stream_htable;
|
||||||
|
struct io_uring_handle *handle = g_kni_handle->threads_handle[thread_seq].iouring_handle;
|
||||||
|
struct io_uring *ring = &handle->ring;
|
||||||
|
|
||||||
|
struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM];
|
||||||
|
int ret = io_uring_peek_batch_cqe(ring, cqes, MAX_BATCH_CQE_NUM);
|
||||||
|
if (ret <= 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < ret; i++)
|
||||||
|
{
|
||||||
|
struct io_uring_cqe *cqe = cqes[i];
|
||||||
|
if (cqe == NULL)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni iouring cqe return NULL");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (cqe->user_data == NULL)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni iouring cqe->user_data return NULL");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct user_data *conn = (struct user_data *)cqe->user_data;
|
||||||
|
if (g_kni_handle->iouring_conf.enable_debuglog)
|
||||||
|
{
|
||||||
|
KNI_LOG_DEBUG(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Handle event: %s, thread: %02d, sockfd: %02d, cqe: %p, user_data: %p, iov_base: %p, iovec_len: %04d, cqe->res: %04d",
|
||||||
|
(conn->type == EVTYPE_READ ? "read" : (conn->type == EVTYPE_WRITE ? "write" : "unknown")),
|
||||||
|
thread_seq, conn->sockfd, cqe, cqe->user_data, conn->vec.iov_base, conn->vec.iov_len, cqe->res);
|
||||||
|
}
|
||||||
|
switch (conn->type)
|
||||||
|
{
|
||||||
|
case EVTYPE_READ:
|
||||||
|
if (cqe->res > 0)
|
||||||
|
{
|
||||||
|
struct ethhdr *ether_hdr = (struct ethhdr *)conn->vec.iov_base;
|
||||||
|
tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni iouring tapfd[%d] read error, (%d, %s)", tap_fd, -cqe->res, strerror(-cqe->res));
|
||||||
|
}
|
||||||
|
io_uring_user_data_destory(conn);
|
||||||
|
io_uring_add_read(thread_seq);
|
||||||
|
break;
|
||||||
|
case EVTYPE_WRITE:
|
||||||
|
if (cqe->res < 0)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "kni iouring tapfd[%d] write error, (%d, %s)", tap_fd, -cqe->res, strerror(-cqe->res));
|
||||||
|
}
|
||||||
|
io_uring_user_data_destory(conn);
|
||||||
|
break;
|
||||||
|
case EVTYPE_UNKNOWN:
|
||||||
|
assert(0);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
assert(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
io_uring_cqe_seen(&handle->ring, cqe);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
static int io_uring_add_write(int thread_seq, char *raw_data, uint16_t raw_len, addr_type_t addr_type)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Unsupport io_uring !!!");
|
||||||
|
exit(-1);
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int io_uring_add_read(int thread_seq)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Unsupport io_uring !!!");
|
||||||
|
exit(-1);
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int io_uring_polling(int thread_seq)
|
||||||
|
{
|
||||||
|
KNI_LOG_ERROR(g_kni_handle->local_logger, TAP_RSS_LOG_TAG "Unsupport io_uring !!!");
|
||||||
|
exit(-1);
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
79
entry/src/kni_iouring.cpp
Normal file
79
entry/src/kni_iouring.cpp
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#include "kni_iouring.h"
|
||||||
|
|
||||||
|
#if (SUPPORT_LIBURING)
|
||||||
|
|
||||||
|
struct user_data *io_uring_user_data_create(int sockfd, enum evtype type, int buff_size)
|
||||||
|
{
|
||||||
|
struct user_data *conn = (struct user_data *)calloc(1, sizeof(struct user_data));
|
||||||
|
conn->sockfd = sockfd;
|
||||||
|
conn->type = type;
|
||||||
|
conn->vec.iov_base = (void *)calloc(buff_size, sizeof(char));
|
||||||
|
conn->vec.iov_len = buff_size;
|
||||||
|
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
void io_uring_user_data_destory(struct user_data *conn)
|
||||||
|
{
|
||||||
|
if (conn)
|
||||||
|
{
|
||||||
|
if (conn->vec.iov_base)
|
||||||
|
{
|
||||||
|
free(conn->vec.iov_base);
|
||||||
|
conn->vec.iov_base = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(conn);
|
||||||
|
conn = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void io_uring_handle_destory(struct io_uring_handle *handle)
|
||||||
|
{
|
||||||
|
if (handle)
|
||||||
|
{
|
||||||
|
io_uring_queue_exit(&handle->ring);
|
||||||
|
|
||||||
|
free(handle);
|
||||||
|
handle = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct io_uring_handle *io_uring_handle_create(int ring_size, int buff_size, int flags, int sq_thread_idle)
|
||||||
|
{
|
||||||
|
struct io_uring_handle *handle = (struct io_uring_handle *)calloc(1, sizeof(struct io_uring_handle));
|
||||||
|
if (handle == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
handle->ring_size = ring_size;
|
||||||
|
handle->buff_size = buff_size;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 参考资料:https://unixism.net/loti/tutorial/sq_poll.html#sq-poll
|
||||||
|
*
|
||||||
|
* 执行下面命令进行验证 IORING_SETUP_SQPOLL:
|
||||||
|
* sudo bpftrace -e 'tracepoint:io_uring:io_uring_submit_sqe {printf("%s(%d)\n", comm, pid);}'
|
||||||
|
*/
|
||||||
|
if (flags)
|
||||||
|
{
|
||||||
|
handle->params.flags |= flags;
|
||||||
|
}
|
||||||
|
if (sq_thread_idle)
|
||||||
|
{
|
||||||
|
handle->params.sq_thread_idle = sq_thread_idle; // milliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
int ret = io_uring_queue_init_params(ring_size, &handle->ring, &handle->params);
|
||||||
|
if (ret)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "Failed at io_uring_queue_init_params(), %s\n", strerror(-ret));
|
||||||
|
io_uring_handle_destory(handle);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
@@ -95,7 +95,7 @@ struct bpf_ctx *kni_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t b
|
|||||||
|
|
||||||
if (bpf_prog_load(ctx->bpf_file, BPF_PROG_TYPE_SOCKET_FILTER, &ctx->bpf_obj, &ctx->bpf_prog_fd) < 0)
|
if (bpf_prog_load(ctx->bpf_file, BPF_PROG_TYPE_SOCKET_FILTER, &ctx->bpf_obj, &ctx->bpf_prog_fd) < 0)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to load bpf object %s, aborting: %s\n", ctx->bpf_file, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to load bpf object %s, aborting: %s", ctx->bpf_file, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,7 +114,7 @@ error:
|
|||||||
#else
|
#else
|
||||||
struct bpf_ctx *kni_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t bpf_queue_num, uint32_t bpf_hash_mode, uint32_t bpf_debug_log, void *logger)
|
struct bpf_ctx *kni_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t bpf_queue_num, uint32_t bpf_hash_mode, uint32_t bpf_debug_log, void *logger)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "not support bpf\n");
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "not support bpf");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@@ -129,7 +129,7 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
tap_fd = open(TUN_PATH, O_RDWR);
|
tap_fd = open(TUN_PATH, O_RDWR);
|
||||||
if (tap_fd == -1)
|
if (tap_fd == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to open " TUN_PATH ", aborting: %s\n", strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to open " TUN_PATH ", aborting: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +138,7 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
strcpy(ifr.ifr_name, tap_dev);
|
strcpy(ifr.ifr_name, tap_dev);
|
||||||
if (ioctl(tap_fd, TUNSETIFF, &ifr) == -1)
|
if (ioctl(tap_fd, TUNSETIFF, &ifr) == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to attach %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to attach %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,7 +148,7 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
*/
|
*/
|
||||||
if (ioctl(tap_fd, TUNSETPERSIST, 1) == -1)
|
if (ioctl(tap_fd, TUNSETPERSIST, 1) == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set persist on %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set persist on %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,7 +158,7 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
// Set bpf
|
// Set bpf
|
||||||
if (ioctl(tap_fd, TUNSETSTEERINGEBPF, (void *)&bpf_prog_fd) == -1)
|
if (ioctl(tap_fd, TUNSETSTEERINGEBPF, (void *)&bpf_prog_fd) == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set bpf on %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set bpf on %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -168,13 +168,13 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
nonblock_flags = fcntl(tap_fd, F_GETFL);
|
nonblock_flags = fcntl(tap_fd, F_GETFL);
|
||||||
if (nonblock_flags == -1)
|
if (nonblock_flags == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get nonblock flags on %s fd, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get nonblock flags on %s fd, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
nonblock_flags |= O_NONBLOCK;
|
nonblock_flags |= O_NONBLOCK;
|
||||||
if (fcntl(tap_fd, F_SETFL, nonblock_flags) == -1)
|
if (fcntl(tap_fd, F_SETFL, nonblock_flags) == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set nonblock flags on %s fd, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set nonblock flags on %s fd, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,7 +182,7 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
fd = socket(PF_INET, SOCK_DGRAM, 0);
|
fd = socket(PF_INET, SOCK_DGRAM, 0);
|
||||||
if (fd == -1)
|
if (fd == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to create socket, aborting: %s\n", strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to create socket, aborting: %s", strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,14 +190,14 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
strcpy(ifr.ifr_name, tap_dev);
|
strcpy(ifr.ifr_name, tap_dev);
|
||||||
if (ioctl(fd, SIOCGIFMTU, &ifr) < 0)
|
if (ioctl(fd, SIOCGIFMTU, &ifr) < 0)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get MTU on %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get MTU on %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set eth up
|
// Set eth up
|
||||||
if (ioctl(fd, SIOCGIFFLAGS, &ifr) == -1)
|
if (ioctl(fd, SIOCGIFFLAGS, &ifr) == -1)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get link status on %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to get link status on %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,12 +206,12 @@ int kni_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd,
|
|||||||
ifr.ifr_flags |= IFF_UP;
|
ifr.ifr_flags |= IFF_UP;
|
||||||
if (ioctl(fd, SIOCSIFFLAGS, &ifr) < 0)
|
if (ioctl(fd, SIOCSIFFLAGS, &ifr) < 0)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set link status on %s, aborting: %s\n", tap_dev, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to set link status on %s, aborting: %s", tap_dev, strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
KNI_LOG_INFO(logger, TAP_RSS_LOG_TAG "using tap device %s with MTU %d\n", tap_dev, ifr.ifr_mtu);
|
KNI_LOG_INFO(logger, TAP_RSS_LOG_TAG "using tap device %s with MTU %d", tap_dev, ifr.ifr_mtu);
|
||||||
close(fd);
|
close(fd);
|
||||||
|
|
||||||
return tap_fd;
|
return tap_fd;
|
||||||
@@ -248,7 +248,7 @@ int kni_tap_read_per_thread(int tap_fd, char *buff, int buff_size, void *logger)
|
|||||||
{
|
{
|
||||||
if (errno != EWOULDBLOCK && errno != EAGAIN)
|
if (errno != EWOULDBLOCK && errno != EAGAIN)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to read data from tapfd %d, aborting: %s\n", tap_fd, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "unable to read data from tapfd %d, aborting: %s", tap_fd, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -260,7 +260,7 @@ int kni_tap_write_per_thread(int tap_fd, const char *data, int data_len, void *l
|
|||||||
int ret = write(tap_fd, data, data_len);
|
int ret = write(tap_fd, data, data_len);
|
||||||
if (ret != data_len)
|
if (ret != data_len)
|
||||||
{
|
{
|
||||||
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "need send %dB, only send %dB, aborting: %s\n", data_len, ret, strerror(errno));
|
KNI_LOG_ERROR(logger, TAP_RSS_LOG_TAG "need send %dB, only send %dB, aborting: %s", data_len, ret, strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user