diff --git a/bpf/bpf_tun_rss_steering.c b/bpf/bpf_tun_rss_steering.c index c07166a..cbc60c3 100644 --- a/bpf/bpf_tun_rss_steering.c +++ b/bpf/bpf_tun_rss_steering.c @@ -229,7 +229,7 @@ static inline int parse_packet(struct packet *packet, struct bpf_config *config) * +----+----+----+----------------------------------+ * If "More fragments" or the offset is nonzero, then this is an IP fragment (RFC791). */ - packet->is_fragmented = !(bpf_ntohs(ip.frag_off) & 0x4000); + packet->is_fragmented = !!(bpf_ntohs(ip.frag_off) & (0x2000 | 0x1fff)); l4_protocol = ip.protocol; l4_offset = ip.ihl * 4; diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 4190122..661ad87 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,10 +1,11 @@ add_library( common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp src/tfe_resource.cpp src/tfe_scan.cpp - src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp src/raw_socket.cpp src/packet_construct.cpp + src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp src/packet_construct.cpp src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp src/tfe_fieldstat.cpp src/tfe_addr_tuple4.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp - src/tfe_ctrl_packet.cpp src/tfe_raw_packet.cpp src/tfe_packet_io_fs.cpp src/mpack.cpp ) + src/tfe_ctrl_packet.cpp src/tfe_raw_packet.cpp src/tfe_packet_io_fs.cpp + src/mpack.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/) target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal) diff --git a/common/include/io_uring.h b/common/include/io_uring.h index 5964f9c..46e5201 100644 --- a/common/include/io_uring.h +++ b/common/include/io_uring.h @@ -27,15 +27,15 @@ struct io_uring_instance; struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug); void io_uring_instance_destory(struct io_uring_instance *instance); -typedef void read_callback(const char *data, int len, void *args); +typedef void io_uring_read_cb(const char *data, int len, void *args); // return 0 : success // reutrn -1 : error -int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *cb_arg); +int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_arg); // return 0 : success // reutrn -1 : error -int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len); +int io_uring_write(struct io_uring_instance *instance, const char *data, int len); // returns the number of processed entrys -int io_uring_peek_ready_entrys(struct io_uring_instance *instance); +int io_uring_polling(struct io_uring_instance *instance); #ifdef __cpluscplus } diff --git a/common/include/raw_socket.h b/common/include/raw_socket.h deleted file mode 100644 index 337197b..0000000 --- a/common/include/raw_socket.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef _RAW_SOCKET_H -#define _RAW_SOCKET_H - -#ifdef __cpluscplus -extern "C" -{ -#endif - -#include -#include -#include -#include - -struct raw_socket -{ - int sockfd; - char interface[16]; - struct ether_addr mac_addr; - struct sockaddr_ll sockaddr; -}; - -struct raw_socket *raw_socket_create(const char *interface, int fd_so_mask); -void raw_socket_destory(struct raw_socket *raw); -int raw_socket_send(struct raw_socket *raw, const char *data, int data_len); -int raw_socket_recv(struct raw_socket *raw, char *buff, int buff_size); - -#ifdef __cpluscplus -} -#endif - -#endif diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index 2ffbf4b..c77d0de 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -22,9 +22,6 @@ struct tap_ctx struct io_uring_instance *io_uring_fd; struct io_uring_instance *io_uring_c; struct io_uring_instance *io_uring_s; - - int buff_size; - char *buff; }; struct packet_io_thread_ctx diff --git a/common/include/tfe_tcp_restore.h b/common/include/tfe_tcp_restore.h index e706c88..e583d63 100644 --- a/common/include/tfe_tcp_restore.h +++ b/common/include/tfe_tcp_restore.h @@ -40,6 +40,10 @@ struct tcp_restore_info void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info); int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer, const char *devname, unsigned int fd_so_mask); +int tfe_tcp_restore_syn_packet(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size); +int tfe_tcp_restore_synack_packet(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size); +int tfe_tcp_restore_ack_packet(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size); + #ifdef __cpluscplus } #endif diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index cc9dd37..cf3d158 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -26,6 +26,8 @@ #define TFE_PATH_MAX 256 #define TFE_SYMBOL_MAX 64 #define TFE_THREAD_MAX 128 +#define TFE_FAKE_C_DEFAULT_TTL 60 +#define TFE_FAKE_S_DEFAULT_TTL 65 #ifndef TFE_CONFIG_BACKLOG_DEFAULT #define TFE_CONFIG_BACKLOG_DEFAULT 20 diff --git a/common/src/io_uring.cpp b/common/src/io_uring.cpp index 4066ce1..1c5fcd3 100644 --- a/common/src/io_uring.cpp +++ b/common/src/io_uring.cpp @@ -6,19 +6,35 @@ extern void *g_default_logger; #if (SUPPORT_LIBURING) + +#include +#include +#include +#include #include -enum entry_type +enum ioevent { - ENTRY_TYPE_UNKNOWN = 0, - ENTRY_TYPE_READ = 1, - ENTRY_TYPE_WRITE = 2, + IOEVENT_UNKNOWN = 0, + IOEVENT_READ = 1, + IOEVENT_WRITE = 2, }; -struct user_data +struct iobuffer { - enum entry_type type; struct iovec vec; + enum ioevent event; + struct iobuffer *next; +}; + +struct iobuffer_pool +{ + int buffer_size; + int buffer_num; + int buffer_used; + int buffer_left; + + struct iobuffer *free_list; }; struct config @@ -34,74 +50,239 @@ struct io_uring_instance { int sockfd; int eventfd; + struct config config; struct io_uring ring; struct io_uring_params params; - void *read_cb_args; - read_callback *read_cb; - - int read_buff_num; - struct user_data **read_buffs; + void *cb_args; + io_uring_read_cb *read_cb; + struct iobuffer_pool *pool; }; -static const char *entry_type_to_string(enum entry_type type) +/****************************************************************************** + * + ******************************************************************************/ + +static const char *ioevent_tostring(enum ioevent event); + +static struct iobuffer *iobuffer_create(int buffer_size); +static void iobuffer_destory(struct iobuffer *buffer); + +struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num); +void iobuffer_pool_destory(struct iobuffer_pool *pool); +void iobuffer_pool_print(struct iobuffer_pool *pool); + +struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool); +void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer); + +struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int ring_size, int buff_size, int flags, int sq_thread_idle, int enable_debug); +void io_uring_instance_destory(struct io_uring_instance *instance); + +static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer); +int io_uring_write(struct io_uring_instance *instance, const char *data, int len); + +int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args); +int io_uring_polling(struct io_uring_instance *instance); + +/****************************************************************************** + * + ******************************************************************************/ + +static const char *ioevent_tostring(enum ioevent event) { - switch (type) + switch (event) { - case ENTRY_TYPE_READ: + case IOEVENT_READ: return "read"; - case ENTRY_TYPE_WRITE: + case IOEVENT_WRITE: return "write"; - case ENTRY_TYPE_UNKNOWN: /* fall passthrough */ + case IOEVENT_UNKNOWN: /* fall passthrough */ default: return "unknown"; } } -static void user_data_destory(struct user_data *data) +static struct iobuffer *iobuffer_create(int buffer_size) { - if (data) + struct iobuffer *buffer = (struct iobuffer *)calloc(1, sizeof(struct iobuffer)); + if (buffer == NULL) { - free(data); - data = NULL; + goto error_out; } -} - -static struct user_data *user_data_create(int buff_size) -{ - struct user_data *data = (struct user_data *)calloc(1, sizeof(struct user_data) + buff_size * sizeof(char)); - data->vec.iov_base = (void *)data + sizeof(struct user_data); - data->vec.iov_len = buff_size; - - return data; -} - -void io_uring_instance_destory(struct io_uring_instance *instance) -{ - if (instance) + buffer->vec.iov_len = buffer_size; + buffer->vec.iov_base = (void *)calloc(buffer->vec.iov_len, sizeof(char)); + if (buffer->vec.iov_base == NULL) { - io_uring_queue_exit(&instance->ring); + goto error_out; + } + buffer->event = IOEVENT_UNKNOWN; + buffer->next = NULL; - if (instance->read_buffs) + return buffer; + +error_out: + iobuffer_destory(buffer); + return NULL; +} + +static void iobuffer_destory(struct iobuffer *buffer) +{ + if (buffer) + { + if (buffer->vec.iov_base) { - for (int i = 0; i < instance->read_buff_num; i++) - { - if (instance->read_buffs[i]) - { - user_data_destory(instance->read_buffs[i]); - instance->read_buffs[i] = NULL; - } - } - - free(instance->read_buffs); - instance->read_buffs = NULL; + free(buffer->vec.iov_base); + buffer->vec.iov_base = NULL; } - free(instance); - instance = NULL; + + free(buffer); + buffer = NULL; } } +struct iobuffer_pool *iobuffer_pool_create(int buffer_size, int buffer_num) +{ + struct iobuffer *head = NULL; + struct iobuffer *tail = NULL; + struct iobuffer *node = NULL; + struct iobuffer_pool *pool = (struct iobuffer_pool *)calloc(1, sizeof(struct iobuffer_pool)); + if (pool == NULL) + { + goto error_out; + } + + pool->buffer_size = buffer_size; + pool->buffer_used = 0; + + for (int i = 0; i < buffer_num; i++) + { + node = iobuffer_create(pool->buffer_size); + if (node == NULL) + { + goto error_out; + } + + if (head == NULL) + { + head = node; + tail = node; + } + else + { + tail->next = node; + tail = node; + } + pool->buffer_left++; + pool->buffer_num++; + } + pool->free_list = head; + + return pool; + +error_out: + iobuffer_pool_destory(pool); + return NULL; +} + +void iobuffer_pool_destory(struct iobuffer_pool *pool) +{ + if (pool) + { + struct iobuffer *next = NULL; + struct iobuffer *node = pool->free_list; + while (node) + { + next = node->next; + iobuffer_destory(node); + node = next; + } + + free(pool); + pool = NULL; + } +} + +void iobuffer_pool_print(struct iobuffer_pool *pool) +{ + if (pool) + { + printf(" pool->buffer_size : %d\n", pool->buffer_size); + printf(" pool->buffer_num : %d\n", pool->buffer_num); + printf(" pool->buffer_used : %d\n", pool->buffer_used); + printf(" pool->buffer_left : %d\n", pool->buffer_left); + printf(" pool->free_list : \n"); + + struct iobuffer *node = pool->free_list; + while (node) + { + printf(" node : %p\n", node); + printf(" node->next : %p\n", node->next); + node = node->next; + } + printf("\n"); + } +} + +struct iobuffer *iobuffer_pool_pop(struct iobuffer_pool *pool) +{ + struct iobuffer *buffer = NULL; + if (pool == NULL) + { + assert(0); + return NULL; + } + + if (pool->buffer_left <= 0) + { + buffer = iobuffer_create(pool->buffer_size); + if (buffer) + { + pool->buffer_used++; + return buffer; + } + else + { + return NULL; + } + } + + if (pool->free_list == NULL) + { + assert(0); + return NULL; + } + + buffer = pool->free_list; + pool->free_list = buffer->next; + buffer->next = NULL; + pool->buffer_used++; + pool->buffer_left--; + + return buffer; +} + +void iobuffer_pool_push(struct iobuffer_pool *pool, struct iobuffer *buffer) +{ + if (pool == NULL || buffer == NULL) + { + return; + } + + if (pool->buffer_left >= pool->buffer_num) + { + iobuffer_destory(buffer); + pool->buffer_used--; + return; + } + + buffer->event = IOEVENT_UNKNOWN; + buffer->next = pool->free_list; + pool->free_list = buffer; + pool->buffer_left++; + pool->buffer_used--; +} + /* * ring_size : 1024 * buff_size : 2048 @@ -134,16 +315,11 @@ struct io_uring_instance *io_uring_instance_create(int sockfd, int eventfd, int instance->config.sq_thread_idle = sq_thread_idle; instance->config.enable_debug = enable_debug; - instance->read_buff_num = 1; - instance->read_buffs = (struct user_data **)calloc(instance->read_buff_num, sizeof(struct user_data *)); - for (int i = 0; i < instance->read_buff_num; i++) + instance->pool = iobuffer_pool_create(instance->config.buff_size, instance->config.ring_size); + if (instance->pool == NULL) { - instance->read_buffs[i] = user_data_create(instance->config.buff_size); - if (instance->read_buffs[i] == NULL) - { - TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create user_data, %s", strerror(errno)); - goto error_out; - } + TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to create io buffer pool, %s", strerror(errno)); + goto error_out; } /* @@ -186,9 +362,20 @@ error_out: return NULL; } +void io_uring_instance_destory(struct io_uring_instance *instance) +{ + if (instance) + { + io_uring_queue_exit(&instance->ring); + iobuffer_pool_destory(instance->pool); + free(instance); + instance = NULL; + } +} + // return 0 : success // reutrn -1 : error -int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_data *data) +static int io_uring_read(struct io_uring_instance *instance, struct iobuffer *buffer) { struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring); if (sqe == NULL) @@ -197,14 +384,14 @@ int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_d return -1; } - data->type = ENTRY_TYPE_READ; - io_uring_prep_readv(sqe, instance->sockfd, &data->vec, 1, 0); - io_uring_sqe_set_data(sqe, data); + buffer->event = IOEVENT_READ; + io_uring_prep_readv(sqe, instance->sockfd, &buffer->vec, 1, 0); + io_uring_sqe_set_data(sqe, buffer); if (instance->config.enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit read entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld", - instance->sockfd, sqe, (void *)sqe->user_data, data->vec.iov_base, data->vec.iov_len); + instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len); } io_uring_submit(&instance->ring); @@ -213,7 +400,7 @@ int io_uring_submit_read_entry(struct io_uring_instance *instance, struct user_d // return 0 : success // reutrn -1 : error -int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len) +int io_uring_write(struct io_uring_instance *instance, const char *data, int len) { struct io_uring_sqe *sqe = io_uring_get_sqe(&instance->ring); if (sqe == NULL) @@ -228,17 +415,23 @@ int io_uring_submit_write_entry(struct io_uring_instance *instance, const char * return -1; } - struct user_data *user_data = user_data_create(instance->config.buff_size); - user_data->type = ENTRY_TYPE_WRITE; - user_data->vec.iov_len = len; - memcpy(user_data->vec.iov_base, data, len); - io_uring_prep_writev(sqe, instance->sockfd, &user_data->vec, 1, 0); - io_uring_sqe_set_data(sqe, user_data); + struct iobuffer *buffer = iobuffer_pool_pop(instance->pool); + if (buffer == NULL) + { + TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer"); + return -1; + } + + buffer->event = IOEVENT_WRITE; + buffer->vec.iov_len = len; + memcpy(buffer->vec.iov_base, data, len); + io_uring_prep_writev(sqe, instance->sockfd, &buffer->vec, 1, 0); + io_uring_sqe_set_data(sqe, buffer); if (instance->config.enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: submit write entry: sockfd: %02d sqe: %p user_data: %p iov_base: %p iovec_len: %04ld", - instance->sockfd, sqe, (void *)sqe->user_data, user_data->vec.iov_base, user_data->vec.iov_len); + instance->sockfd, sqe, (void *)sqe->user_data, buffer->vec.iov_base, buffer->vec.iov_len); } io_uring_submit(&instance->ring); @@ -247,7 +440,7 @@ int io_uring_submit_write_entry(struct io_uring_instance *instance, const char * // return 0 : success // reutrn -1 : error -int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *read_cb_args) +int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_args) { if (instance->read_cb) { @@ -256,30 +449,42 @@ int io_uring_register_read_callback(struct io_uring_instance *instance, read_cal } instance->read_cb = read_cb; - instance->read_cb_args = read_cb_args; + instance->cb_args = cb_args; - for (int i = 0; i < instance->read_buff_num; i++) + struct iobuffer *buffer = iobuffer_pool_pop(instance->pool); + if (buffer == NULL) { - struct user_data *data = instance->read_buffs[i]; - if (io_uring_submit_read_entry(instance, data) == -1) - { - return -1; - } + TFE_LOG_ERROR(g_default_logger, "IO_URING: cannot get fixed buffer"); + return -1; + } + + if (io_uring_read(instance, buffer) == -1) + { + iobuffer_pool_push(instance->pool, buffer); + return -1; } return 0; } // returns the number of processed entrys -int io_uring_peek_ready_entrys(struct io_uring_instance *instance) +int io_uring_polling(struct io_uring_instance *instance) { int ret = 0; int total = 0; struct io_uring_cqe *cqes[MAX_BATCH_CQE_NUM]; + struct io_uring_cqe *cqe = NULL; + struct iobuffer *buffer = NULL; + + void *cb_args = instance->cb_args; + struct io_uring *ring = &instance->ring; + struct iobuffer_pool *pool = instance->pool; + io_uring_read_cb *read_cb = instance->read_cb; + int enable_debug = instance->config.enable_debug; while (1) { - ret = io_uring_peek_batch_cqe(&instance->ring, cqes, MAX_BATCH_CQE_NUM); + ret = io_uring_peek_batch_cqe(ring, cqes, MAX_BATCH_CQE_NUM); if (ret <= 0) { return total; @@ -288,63 +493,58 @@ int io_uring_peek_ready_entrys(struct io_uring_instance *instance) total += ret; for (int i = 0; i < ret; i++) { - struct io_uring_cqe *cqe = cqes[i]; + cqe = cqes[i]; if (cqe == NULL || (void *)cqe->user_data == NULL) { // TFE_LOG_ERROR(g_default_logger, "IO_URING: unable to get io_uring cqe, null is return"); continue; } - struct user_data *user_data = (struct user_data *)cqe->user_data; - if (instance->config.enable_debug) + buffer = (struct iobuffer *)cqe->user_data; + if (enable_debug) { TFE_LOG_DEBUG(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d cqe: %p user_data: %p iov_base: %p iovec_len: %04ld cqe->res: %04d", - entry_type_to_string(user_data->type), instance->sockfd, cqe, (void *)user_data, user_data->vec.iov_base, user_data->vec.iov_len, cqe->res); + ioevent_tostring(buffer->event), instance->sockfd, cqe, (void *)buffer, buffer->vec.iov_base, buffer->vec.iov_len, cqe->res); } - switch (user_data->type) + switch (buffer->event) { - case ENTRY_TYPE_READ: + case IOEVENT_READ: if (cqe->res > 0) { - if (instance->read_cb != NULL) + if (read_cb) { - instance->read_cb((const char *)user_data->vec.iov_base, cqe->res, instance->read_cb_args); + read_cb((const char *)buffer->vec.iov_base, cqe->res, cb_args); } } - else - { - TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)", - entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res)); - } cqe->user_data = 0; - io_uring_cqe_seen(&instance->ring, cqe); - io_uring_submit_read_entry(instance, user_data); + io_uring_cqe_seen(ring, cqe); + io_uring_read(instance, buffer); break; - case ENTRY_TYPE_WRITE: - if (cqe->res > 0) - { - // data->write_cb - } - else + case IOEVENT_WRITE: + if (cqe->res < 0) { TFE_LOG_ERROR(g_default_logger, "IO_URING: handle %s entry: sockfd: %02d (%d, %s)", - entry_type_to_string(user_data->type), instance->sockfd, -cqe->res, strerror(-cqe->res)); + ioevent_tostring(buffer->event), instance->sockfd, -cqe->res, strerror(-cqe->res)); } - user_data_destory(user_data); + iobuffer_pool_push(pool, buffer); cqe->user_data = 0; - io_uring_cqe_seen(&instance->ring, cqe); + io_uring_cqe_seen(ring, cqe); break; default: - user_data_destory(user_data); + iobuffer_pool_push(pool, buffer); cqe->user_data = 0; - io_uring_cqe_seen(&instance->ring, cqe); + io_uring_cqe_seen(ring, cqe); break; } } + + if (ret < MAX_BATCH_CQE_NUM) + { + return total; + } } } #else - struct io_uring_instance { }; @@ -360,23 +560,21 @@ void io_uring_instance_destory(struct io_uring_instance *instance) TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); } -int io_uring_register_read_callback(struct io_uring_instance *instance, read_callback *read_cb, void *cb_arg) -{ - TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); - - return -1; -} - -int io_uring_submit_write_entry(struct io_uring_instance *instance, const char *data, int len) +int io_uring_set_read_cb(struct io_uring_instance *instance, io_uring_read_cb *read_cb, void *cb_arg) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return -1; } -int io_uring_peek_ready_entrys(struct io_uring_instance *instance) +int io_uring_write(struct io_uring_instance *instance, const char *data, int len) +{ + TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); + return -1; +} + +int io_uring_polling(struct io_uring_instance *instance) { TFE_LOG_ERROR(g_default_logger, "IO_URING: feature not support on current system"); return 0; } - #endif \ No newline at end of file diff --git a/common/src/raw_socket.cpp b/common/src/raw_socket.cpp deleted file mode 100644 index 177dfbb..0000000 --- a/common/src/raw_socket.cpp +++ /dev/null @@ -1,101 +0,0 @@ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "raw_socket.h" -#include "packet_construct.h" - -void raw_socket_destory(struct raw_socket *raw) -{ - if (raw) - { - if (raw->sockfd) - { - close(raw->sockfd); - } - free(raw); - raw = NULL; - } -} - -struct raw_socket *raw_socket_create(const char *interface, int fd_so_mask) -{ - struct ifreq ifr = {0}; - - struct raw_socket *raw = (struct raw_socket *)calloc(1, sizeof(struct raw_socket)); - memcpy(raw->interface, interface, strlen(interface)); - - raw->sockfd = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); - if (raw->sockfd < 0) - { - printf("socket() failed, (%d: %s)", errno, strerror(errno)); - goto error; - } - - snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", raw->interface); - if (ioctl(raw->sockfd, SIOCGIFHWADDR, &ifr) < 0) - { - printf("ioctl(SIOCGIFHWADDR) failed, (%d: %s)", errno, strerror(errno)); - goto error; - } - memcpy(&(raw->mac_addr), ifr.ifr_hwaddr.sa_data, 6); - - raw->sockaddr.sll_ifindex = if_nametoindex(interface); - if (raw->sockaddr.sll_ifindex == 0) - { - printf("if_nametoindex() failed, (%d: %s)", errno, strerror(errno)); - goto error; - } - raw->sockaddr.sll_family = AF_PACKET; - memcpy((void *)&(raw->sockaddr.sll_addr), (void *)&(raw->mac_addr), 6); - raw->sockaddr.sll_halen = 6; - - if (setsockopt(raw->sockfd, SOL_SOCKET, SO_MARK, (char *)&fd_so_mask, sizeof(fd_so_mask)) < 0) - { - printf("setsockopt(SO_MARK) failed, (%d: %s)", errno, strerror(errno)); - goto error; - } - - return raw; - -error: - raw_socket_destory(raw); - - return NULL; -} - -int raw_socket_send(struct raw_socket *raw, const char *data, int data_len) -{ - int ret = sendto(raw->sockfd, data, data_len, 0, (struct sockaddr *)&raw->sockaddr, sizeof(raw->sockaddr)); - if (ret < 0) - { - printf("sendto() failed, (%d: %s)", errno, strerror(errno)); - } - - return ret; -} - -int raw_socket_recv(struct raw_socket *raw, char *buff, int buff_size) -{ - socklen_t addr_len = sizeof(raw->sockaddr); - int ret = recvfrom(raw->sockfd, buff, buff_size, 0, (struct sockaddr *)&raw->sockaddr, &addr_len); - if (ret < 0) - { - printf("sendto() failed, (%d: %s)", errno, strerror(errno)); - } - - return ret; -} \ No newline at end of file diff --git a/common/src/tfe_cmsg.cpp b/common/src/tfe_cmsg.cpp index 6b03716..2f79a68 100644 --- a/common/src/tfe_cmsg.cpp +++ b/common/src/tfe_cmsg.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include "tfe_types.h" #include "tfe_utils.h" @@ -26,7 +25,6 @@ struct tfe_cmsg { uint8_t flag; uint8_t ref; - pthread_rwlock_t rwlock; uint16_t nr_tlvs; struct tfe_cmsg_tlv* tlvs[TFE_CMSG_TLV_NR_MAX]; uint16_t size; @@ -44,8 +42,6 @@ struct tfe_cmsg* tfe_cmsg_init() struct tfe_cmsg *cmsg = ALLOC(struct tfe_cmsg, 1); cmsg->size = sizeof(struct tfe_cmsg_serialize_header); - pthread_rwlock_init(&(cmsg->rwlock), NULL); - ATOMIC_ZERO(&(cmsg->flag)); ATOMIC_ZERO(&(cmsg->ref)); ATOMIC_INC(&(cmsg->ref)); @@ -58,13 +54,10 @@ void tfe_cmsg_destroy(struct tfe_cmsg **cmsg) { if ((__sync_sub_and_fetch(&((*cmsg)->ref), 1) != 0)) return; - pthread_rwlock_wrlock(&((*cmsg)->rwlock)); for(int i = 0; i < TFE_CMSG_TLV_NR_MAX; i++) { FREE(&((*cmsg)->tlvs[i])); } - pthread_rwlock_unlock(&((*cmsg)->rwlock)); - pthread_rwlock_destroy(&((*cmsg)->rwlock)); FREE(cmsg); } } @@ -98,7 +91,6 @@ int tfe_cmsg_set(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const unsi { return TFE_CMSG_INVALID_TYPE; } - pthread_rwlock_wrlock(&cmsg->rwlock); struct tfe_cmsg_tlv *tlv = cmsg->tlvs[type]; uint16_t length = sizeof(struct tfe_cmsg_tlv) + size; @@ -122,7 +114,6 @@ int tfe_cmsg_set(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const unsi tlv->length = length; memcpy(tlv->value_as_string, value, size); cmsg->tlvs[type] = tlv; - pthread_rwlock_unlock(&cmsg->rwlock); return 0; } @@ -139,7 +130,6 @@ int tfe_cmsg_get_value(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, unsi goto errout; } - pthread_rwlock_rdlock(&cmsg->rwlock); tlv = cmsg->tlvs[type]; if (unlikely(tlv == NULL)) { @@ -156,37 +146,30 @@ int tfe_cmsg_get_value(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, unsi memcpy(out_value, tlv->value_as_string, value_length); *out_size = value_length; - pthread_rwlock_unlock(&cmsg->rwlock); return 0; errout: - pthread_rwlock_unlock(&cmsg->rwlock); return result; } uint16_t tfe_cmsg_serialize_size_get(struct tfe_cmsg *cmsg) { - pthread_rwlock_rdlock(&cmsg->rwlock); uint16_t size = cmsg->size; - pthread_rwlock_unlock(&cmsg->rwlock); return size; } int tfe_cmsg_serialize(struct tfe_cmsg *cmsg, unsigned char *buff, uint16_t bufflen, uint16_t *serialize_len) { //size是serialize之后的实际长度 - pthread_rwlock_rdlock(&cmsg->rwlock); uint16_t size = cmsg->size; //传入buff是否够长 if(bufflen < size) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_BUFF_NOT_ENOUGH; } //size是否正确 if(size < sizeof(struct tfe_cmsg_serialize_header)) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_BUFF_NOT_ENOUGH; } struct tfe_cmsg_serialize_header *header = (struct tfe_cmsg_serialize_header*)buff; @@ -204,7 +187,6 @@ int tfe_cmsg_serialize(struct tfe_cmsg *cmsg, unsigned char *buff, uint16_t buff } if(count != cmsg->nr_tlvs) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_INVALID_FORMAT; } //序列化 @@ -217,13 +199,11 @@ int tfe_cmsg_serialize(struct tfe_cmsg *cmsg, unsigned char *buff, uint16_t buff } if(i != tlv->type) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_INVALID_FORMAT; } uint16_t length = tlv->length; if(length < sizeof(struct tfe_cmsg_tlv) || offset + length > size) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_INVALID_FORMAT; } memcpy((char*)header + offset, (void*)tlv, length); @@ -235,11 +215,9 @@ int tfe_cmsg_serialize(struct tfe_cmsg *cmsg, unsigned char *buff, uint16_t buff //检查size是否正确 if(offset != size) { - pthread_rwlock_unlock(&cmsg->rwlock); return TFE_CMSG_INVALID_FORMAT; } *serialize_len = size; - pthread_rwlock_unlock(&cmsg->rwlock); return 0; } @@ -260,7 +238,6 @@ int tfe_cmsg_deserialize(const unsigned char *data, uint16_t len, struct tfe_cms } cmsg = ALLOC(struct tfe_cmsg, 1); - pthread_rwlock_init(&(cmsg->rwlock), NULL); ATOMIC_ZERO(&(cmsg->flag)); ATOMIC_ZERO(&(cmsg->ref)); ATOMIC_INC(&(cmsg->ref)); diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index e7cc60e..a7cae4d 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -24,7 +24,6 @@ #include "tfe_cmsg.h" #include "tfe_tcp_restore.h" #include "tfe_stream.h" -#include "raw_socket.h" #include "packet_construct.h" #include "mpack.h" #include "tap.h" @@ -135,30 +134,6 @@ struct metadata struct route_ctx route_ctx; }; -struct tcp_option_mss { - uint8_t kind; - uint8_t length; - uint16_t mss_value; -} __attribute__((__packed__)); - -struct tcp_option_window_scale { - uint8_t kind; - uint8_t length; - uint8_t shift_count; -} __attribute__((__packed__)); - -struct tcp_option_sack { - uint8_t kind; - uint8_t length; -} __attribute__((__packed__)); - -struct tcp_option_time_stamp { - uint8_t kind; - uint8_t length; - uint32_t tsval; - uint32_t tsecr; -} __attribute__((__packed__)); - extern int tcp_policy_enforce(struct tcp_policy_enforcer *tcp_enforcer, struct tfe_cmsg *cmsg); extern int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg * cmsg); extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, struct tfe_cmsg *cmsg, uint64_t rule_id); @@ -312,233 +287,6 @@ static int add_ether_proto(void *raw_data, uint16_t proto){ return 0; } -static int fake_tcp_handshake(struct tfe_proxy *proxy, struct tcp_restore_info *restore_info) -{ - char buffer[1500] = {0}; - int length = 0; - - char tcp_option_buffer_c[40] = {0}; - char tcp_option_buffer_s[40] = {0}; - char tcp_option_buffer_c2[40] = {0}; - int tcp_option_length_c = 0; - int tcp_option_length_s = 0; - int tcp_option_length_c2 = 0; - - const struct tcp_restore_endpoint *client = &restore_info->client; - const struct tcp_restore_endpoint *server = &restore_info->server; - struct raw_socket *raw_socket_c = raw_socket_create(proxy->traffic_steering_options.device_client, proxy->traffic_steering_options.so_mask_client); - struct raw_socket *raw_socket_s = raw_socket_create(proxy->traffic_steering_options.device_server, proxy->traffic_steering_options.so_mask_server); - if (raw_socket_c == NULL || raw_socket_s == NULL) - { - raw_socket_destory(raw_socket_c); - raw_socket_destory(raw_socket_s); - - return -1; - } - - uint32_t c_seq = client->seq - 1; - uint32_t s_seq = server->seq - 1; - - /* - * Maximum segment size: Kind: 2, Length: 4 - * +---------+---------+---------+ - * | Kind=2 |Length=4 |mss.value| - * +---------+---------+---------+ - * 1 1 2 - */ - if (client->mss && server->mss) - { - struct tcp_option_mss *option_c = (struct tcp_option_mss *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 2; - option_c->length = 4; - option_c->mss_value = htons(client->mss); - tcp_option_length_c += sizeof(struct tcp_option_mss); - - struct tcp_option_mss *option_s = (struct tcp_option_mss *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 2; - option_s->length = 4; - option_s->mss_value = htons(server->mss); - tcp_option_length_s += sizeof(struct tcp_option_mss); - } - - /* - * Window Scale option: Kind: 3, Length: 3 - * +---------+---------+---------+ - * | Kind=3 |Length=3 |shift.cnt| - * +---------+---------+---------+ - * 1 1 1 - */ - if (client->wscale_perm && server->wscale_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 1); - tcp_option_length_c += 1; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 1); - tcp_option_length_s += 1; - - struct tcp_option_window_scale *option_c = (struct tcp_option_window_scale *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 3; - option_c->length = 3; - option_c->shift_count = client->wscale; - tcp_option_length_c += sizeof(struct tcp_option_window_scale); - - struct tcp_option_window_scale *option_s = (struct tcp_option_window_scale *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 3; - option_s->length = 3; - option_s->shift_count = server->wscale; - tcp_option_length_s += sizeof(struct tcp_option_window_scale); - } - - /* - * SACK option: Kind: 4, Length: 2 - * +---------+---------+ - * | Kind=4 |Length=2 | - * +---------+---------+ - * 1 1 - */ - if (client->sack_perm && server->sack_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 2); - tcp_option_length_c += 2; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 2); - tcp_option_length_s += 2; - - struct tcp_option_sack *option_c = (struct tcp_option_sack *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 4; - option_c->length = 2; - tcp_option_length_c += sizeof(struct tcp_option_sack); - - struct tcp_option_sack *option_s = (struct tcp_option_sack *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 4; - option_s->length = 2; - tcp_option_length_s += sizeof(struct tcp_option_sack); - } - - /* - * Time Stamp option: Kind: 8, Length: 10 - * +---------+---------+-----+-----+ - * | Kind=8 |Length=10|tsval|tsecr| - * +---------+---------+-----+-----+ - * 1 1 4 4 - */ - if (client->timestamp_perm && server->timestamp_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 2); - tcp_option_length_c += 2; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 2); - tcp_option_length_s += 2; - memset(tcp_option_buffer_c2 + tcp_option_length_c2, 1, 2); - tcp_option_length_c2 += 2; - - struct tcp_option_time_stamp *option_c = (struct tcp_option_time_stamp *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 8; - option_c->length = 10; - option_c->tsval = htonl(client->ts_val); - option_c->tsecr = htonl(0); - tcp_option_length_c += sizeof(struct tcp_option_time_stamp); - - struct tcp_option_time_stamp *option_s = (struct tcp_option_time_stamp *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 8; - option_s->length = 10; - option_s->tsval = htonl(server->ts_val); - option_s->tsecr = htonl(client->ts_val); - tcp_option_length_s += sizeof(struct tcp_option_time_stamp); - - struct tcp_option_time_stamp *option_c2 = (struct tcp_option_time_stamp *)(tcp_option_buffer_c2 + tcp_option_length_c2); - option_c2->kind = 8; - option_c2->length = 10; - option_c2->tsval = htonl(client->ts_val); - option_c2->tsecr = htonl(server->ts_val); - tcp_option_length_c2 += sizeof(struct tcp_option_time_stamp); - } - - if (client->addr.ss_family == AF_INET6) - { - struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; - struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; - uint16_t port_client = sk_client->sin6_port; - uint16_t port_server = sk_server->sin6_port; - - // C -> S - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_client->sin6_addr, &sk_server->sin6_addr, 55, // IPv6 - port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header - tcp_option_buffer_c, tcp_option_length_c, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_c, buffer, length); - c_seq += 1; - - // S -> C - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_s->mac_addr, &raw_socket_c->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_server->sin6_addr, &sk_client->sin6_addr, 65, // IPv6 - port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header - tcp_option_buffer_s, tcp_option_length_s, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_s, buffer, length); - s_seq += 1; - - // C -> S - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_client->sin6_addr, &sk_server->sin6_addr, 55, // IPv6 - port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header - tcp_option_buffer_c2, tcp_option_length_c2, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_c, buffer, length); - } - else - { - struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; - struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; - uint16_t port_client = sk_client->sin_port; - uint16_t port_server = sk_server->sin_port; - - // C -> S - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IP, // Ether - &sk_client->sin_addr, &sk_server->sin_addr, 0, 55, 0x11, // IPv4 - port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header - tcp_option_buffer_c, tcp_option_length_c, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_c, buffer, length); - c_seq += 1; - - // S -> C - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_s->mac_addr, &raw_socket_c->mac_addr, 0, ETH_P_IP, // Ether - &sk_server->sin_addr,&sk_client->sin_addr, 0, 65, 0x12, // IPv4 - port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header - tcp_option_buffer_s, tcp_option_length_s, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_s, buffer, length); - s_seq += 1; - - // C -> S - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IP, // Ether - &sk_client->sin_addr, &sk_server->sin_addr, 0, 55, 0x13, // IPv4 - port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header - tcp_option_buffer_c2, tcp_option_length_c2, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_c, buffer, length); - } - - raw_socket_destory(raw_socket_c); - raw_socket_destory(raw_socket_s); - - return 0; -} - static int overwrite_tcp_mss(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore, uint64_t session_id, void *logger) { int ret = 0; @@ -1185,6 +933,53 @@ static void set_passthrough_reason(struct tfe_cmsg *cmsg, char *reason) tfe_cmsg_set_flag(cmsg, TFE_CMSG_FLAG_USER0); } +typedef int tcp_handshake_fn(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size); +static void packet_io_send_fake_pkt(struct packet_io_thread_ctx *thread, struct tcp_restore_info *info, uint64_t session_id, int c2s_is_e2i_dir) +{ + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; + struct packet_io *packet_io = thread->ref_io; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; + struct ether_addr *client_mac = (struct ether_addr *)&packet_io->config.tap_c_mac; + struct ether_addr *server_mac = (struct ether_addr *)&packet_io->config.tap_s_mac; + void *logger = thread->logger; + + char buffer[1500]; + struct metadata meta = {0}; + meta.session_id = session_id; + meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0); + meta.is_ctrl_pkt = 0; + meta.l7offset = 0; + meta.sids.num = 2; + meta.sids.elems[0] = acceptor_ctx->sce_sids; + meta.sids.elems[1] = acceptor_ctx->proxy_sids; + + static tcp_handshake_fn *fn[3] = {tfe_tcp_restore_syn_packet, tfe_tcp_restore_synack_packet, tfe_tcp_restore_ack_packet}; + + marsio_buff_t *tx_buffs[3]; + marsio_buff_malloc_global(packet_io->instance, tx_buffs, 3, 0, thread->thread_index); + + for (int i = 0; i < 3; i++) + { + meta.raw_len = fn[i](info, client_mac, server_mac, buffer, sizeof(buffer)); + meta.raw_data = marsio_buff_append(tx_buffs[i], meta.raw_len); + memcpy(meta.raw_data, buffer, meta.raw_len); + + switch (i) + { + case 0: /* fail through */ + case 2: + meta.is_e2i_dir = c2s_is_e2i_dir; + break; + case 1: + meta.is_e2i_dir = !c2s_is_e2i_dir; + break; + } + packet_io_set_metadata(tx_buffs[i], &meta, logger); + throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, meta.raw_len); + } + marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 3, MARSIO_SEND_OPT_REHASH); +} + // return 0 : success // return -1 : error static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) @@ -1287,13 +1082,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser (STREAM_PROTO_SSL == (enum tfe_stream_proto)stream_protocol_in_char && thread->ref_proxy->traffic_steering_options.enable_steering_ssl) || enable_decrypted_traffic_steering == 1) { - if (fake_tcp_handshake(thread->ref_proxy, &restore_info) == -1) - { - TFE_LOG_ERROR(logger, "%s: session %lu Failed at fake_tcp_handshake()", LOG_TAG_PKTIO, meta->session_id); - is_passthrough = 1; - set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - goto passthrough; - } + packet_io_send_fake_pkt(thread, &restore_info, meta->session_id, meta->is_e2i_dir); fd_fake_c = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), thread->ref_proxy->traffic_steering_options.device_client, thread->ref_proxy->traffic_steering_options.so_mask_client); if (fd_fake_c < 0) @@ -1314,7 +1103,13 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser } } + stream_common_direction = meta->is_e2i_dir ? 'I' : 'E'; + tfe_cmsg_set(parser->cmsg, TFE_CMSG_COMMON_DIRECTION, (const unsigned char *)&stream_common_direction, sizeof(stream_common_direction)); + snprintf(stream_traceid, 24, "%" PRIu64, meta->session_id); + tfe_cmsg_set(parser->cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char *)stream_traceid, strlen(stream_traceid)); + tfe_cmsg_dup(parser->cmsg); + // 为避免 packet IO thread 与 worker 访问 cmsg 时出现竞争,packet IO thread 必须在调用 tfe_proxy_fds_accept 之前 set cmsg if (tfe_proxy_fds_accept(thread->ref_proxy, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s, parser->cmsg) < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tfe_proxy_fds_accept()", LOG_TAG_PKTIO, meta->session_id); @@ -1322,14 +1117,6 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } - - // E -> I - if (meta->is_e2i_dir) - stream_common_direction = 'I'; - // I -> E - else - stream_common_direction = 'E'; - tfe_cmsg_set(parser->cmsg, TFE_CMSG_COMMON_DIRECTION, (const unsigned char *)&stream_common_direction, sizeof(stream_common_direction)); } else if (parser->intercpet_data & (IS_SINGLE | IS_TUNNEL)) { is_passthrough = 1; @@ -1386,8 +1173,6 @@ passthrough: route_ctx_copy(&s_ctx->raw_meta_e2i->route_ctx, &parser->ack_route_ctx); } - snprintf(stream_traceid, 24, "%" PRIu64 , s_ctx->session_id); - tfe_cmsg_set(parser->cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char *)stream_traceid, strlen(stream_traceid)); TFE_LOG_INFO(logger, "%s: session %lu %s active first", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr); session_table_insert(thread->session_table, s_ctx->session_id, &(s_ctx->c2s_info.tuple4), s_ctx, session_value_free_cb); @@ -1587,7 +1372,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac); throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len); if (packet_io->config.enable_iouring) { - io_uring_submit_write_entry(thread->tap_ctx->io_uring_s, raw_data, raw_len); + io_uring_write(thread->tap_ctx->io_uring_s, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_s, raw_data, raw_len, logger); @@ -1598,7 +1383,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac); throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len); if (packet_io->config.enable_iouring) { - io_uring_submit_write_entry(thread->tap_ctx->io_uring_c, raw_data, raw_len); + io_uring_write(thread->tap_ctx->io_uring_c, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_c, raw_data, raw_len, logger); @@ -1646,7 +1431,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx add_ether_proto(packet_buff, ETH_P_IPV6); if (packet_io->config.enable_iouring) { - io_uring_submit_write_entry(thread->tap_ctx->io_uring_fd, packet_buff, packet_len); + io_uring_write(thread->tap_ctx->io_uring_fd, packet_buff, packet_len); } else { tap_write(thread->tap_ctx->tap_fd, packet_buff, packet_len, logger); @@ -1659,7 +1444,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx // send to tap0 add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac); if (packet_io->config.enable_iouring) { - io_uring_submit_write_entry(thread->tap_ctx->io_uring_fd, raw_data, raw_len); + io_uring_write(thread->tap_ctx->io_uring_fd, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_fd, raw_data, raw_len, logger); @@ -1700,10 +1485,6 @@ void tfe_tap_ctx_destory(struct tap_ctx *handler) tap_close(handler->tap_fd); tap_close(handler->tap_c); tap_close(handler->tap_s); - if (handler->buff) { - free(handler->buff); - handler->buff = NULL; - } free(handler); handler = NULL; @@ -2015,7 +1796,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; marsio_buff_t *tx_buffs[1]; - int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); + int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d", alloc_ret, thread->thread_index); @@ -2099,7 +1880,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; marsio_buff_t *tx_buffs[1]; - int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); + int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d", alloc_ret, thread->thread_index); @@ -2158,4 +1939,4 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) add_ether_header(dst, src_mac, dst_mac); throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, packet_len); marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH); -} +} \ No newline at end of file diff --git a/common/src/tfe_raw_packet.cpp b/common/src/tfe_raw_packet.cpp index 478e2eb..04cbc93 100644 --- a/common/src/tfe_raw_packet.cpp +++ b/common/src/tfe_raw_packet.cpp @@ -65,8 +65,8 @@ struct gtp_hdr * Static API ******************************************************************************/ -static int raw_packet_parser_push(struct raw_pkt_parser *handler, enum layer_type type, uint16_t offset); -static enum parse_status raw_packet_parser_status(struct raw_pkt_parser *handler, const void *data, enum layer_type this_type); +static inline int raw_packet_parser_push(struct raw_pkt_parser *handler, enum layer_type type, uint16_t offset); +static inline enum parse_status raw_packet_parser_status(struct raw_pkt_parser *handler, const void *data, enum layer_type this_type); static const char *ldbc_method_to_string(enum ldbc_method ldbc_method); @@ -76,16 +76,16 @@ static const char *layer_type2str(enum layer_type this_type); static uint16_t parse_gtphdr_len(const struct gtp_hdr *gtph); // parser protocol -static const void *parse_ether(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_udp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); -static const void *parse_mpls(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_ether(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_udp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); +static inline const void *parse_mpls(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger); /****************************************************************************** * Public API @@ -539,7 +539,7 @@ int raw_traffic_decapsulate(struct raw_pkt_parser *handler, char *raw_data, int // return 0 : success // return -ENOMEM : error -static int raw_packet_parser_push(struct raw_pkt_parser *handler, enum layer_type type, uint16_t offset) +static inline int raw_packet_parser_push(struct raw_pkt_parser *handler, enum layer_type type, uint16_t offset) { struct layer_results *result = &handler->results; @@ -557,7 +557,7 @@ static int raw_packet_parser_push(struct raw_pkt_parser *handler, enum layer_typ // return PARSE_STATUS_CONTINUE // return PARSE_STATUS_STOP -static enum parse_status raw_packet_parser_status(struct raw_pkt_parser *handler, const void *data, enum layer_type this_type) +static inline enum parse_status raw_packet_parser_status(struct raw_pkt_parser *handler, const void *data, enum layer_type this_type) { /* * only when this_type & handler->expect_type is true, @@ -716,7 +716,7 @@ static uint16_t parse_gtphdr_len(const struct gtp_hdr *gtph) return (char *)p_ext_hdr - (char *)gtph; } -static const void *parse_ether(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_ether(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct ethhdr)) { @@ -760,7 +760,7 @@ static const void *parse_ether(struct raw_pkt_parser *handler, const void *data, } } -static const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct ip)) { @@ -800,7 +800,7 @@ static const void *parse_ipv4(struct raw_pkt_parser *handler, const void *data, } } -static const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct ip6_hdr)) { @@ -840,7 +840,7 @@ static const void *parse_ipv6(struct raw_pkt_parser *handler, const void *data, } } -static const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct tcphdr)) { @@ -863,7 +863,7 @@ static const void *parse_tcp(struct raw_pkt_parser *handler, const void *data, s return data_next_layer; } -static const void *parse_udp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_udp(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct udp_hdr)) { @@ -897,7 +897,7 @@ static const void *parse_udp(struct raw_pkt_parser *handler, const void *data, s } } -static const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < 8) { @@ -932,7 +932,7 @@ static const void *parse_pppoe_ses(struct raw_pkt_parser *handler, const void *d } } -static const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct vxlan_hdr)) { @@ -955,7 +955,7 @@ static const void *parse_vxlan(struct raw_pkt_parser *handler, const void *data, return parse_ether(handler, data_next_layer, data_next_length, LAYER_TYPE_ETHER, logger); } -static const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct vlan_hdr)) { @@ -998,7 +998,7 @@ static const void *parse_vlan8021q(struct raw_pkt_parser *handler, const void *d } } -static const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < sizeof(struct gtp_hdr)) { @@ -1036,7 +1036,7 @@ static const void *parse_gtpv1_u(struct raw_pkt_parser *handler, const void *dat } } -static const void *parse_mpls(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) +static inline const void *parse_mpls(struct raw_pkt_parser *handler, const void *data, size_t length, enum layer_type this_type, void *logger) { if (length < 4) { diff --git a/common/src/tfe_tcp_restore.cpp b/common/src/tfe_tcp_restore.cpp index 339921f..0282845 100644 --- a/common/src/tfe_tcp_restore.cpp +++ b/common/src/tfe_tcp_restore.cpp @@ -6,14 +6,15 @@ #include #include #include - +#include #include #include +#include void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) { - char str_client_addr[64] = { 0 }; - char str_server_addr[64] = { 0 }; + char str_client_addr[64] = {0}; + char str_server_addr[64] = {0}; const struct tcp_restore_endpoint *client = &info->client; const struct tcp_restore_endpoint *server = &info->server; @@ -31,11 +32,11 @@ void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) inet_ntop(AF_INET, &sk_server->sin_addr, str_server_addr, sizeof(str_client_addr)); TFE_LOG_DEBUG(g_default_logger, "tcp_restore_info %p: cur_dir=%u, %s:%hu->%s:%hu, seq=%u, ack=%u, " - "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " - "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", - info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, - client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), - server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); + "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " + "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", + info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, + client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), + server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); } else if (client->addr.ss_family == AF_INET6) { @@ -48,11 +49,11 @@ void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) inet_ntop(AF_INET6, &sk_server->sin6_addr, str_server_addr, sizeof(str_client_addr)); TFE_LOG_DEBUG(g_default_logger, "tcp_restore_info %p: cur_dir=%u, %s:%hu->%s:%hu, seq=%u, ack=%u, " - "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " - "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", - info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, - client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), - server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); + "client={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }, " + "server={ mss=%u, wscale_perm=%u, wscale=%u, ts=%u, sack=%u }", + info, info->cur_dir, str_client_addr, port_client, str_server_addr, port_server, info->client.seq, info->client.ack, + client->mss, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), + server->mss, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); } } @@ -65,7 +66,7 @@ int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const socklen_t buffer_len = sizeof(buffer); unsigned int nr_tcp_repair_opts = 0; struct tcp_repair_opt tcp_repair_opts[8]; - struct tcp_repair_window tcp_repair_window = { 0 }; + struct tcp_repair_window tcp_repair_window = {0}; if (endpoint->addr.ss_family == AF_INET) { @@ -277,3 +278,358 @@ errout: return -1; } + +struct tcp_option_mss +{ + uint8_t kind; + uint8_t length; + uint16_t mss_value; +} __attribute__((__packed__)); + +struct tcp_option_window_scale +{ + uint8_t kind; + uint8_t length; + uint8_t shift_count; +} __attribute__((__packed__)); + +struct tcp_option_sack +{ + uint8_t kind; + uint8_t length; +} __attribute__((__packed__)); + +struct tcp_option_time_stamp +{ + uint8_t kind; + uint8_t length; + uint32_t tsval; + uint32_t tsecr; +} __attribute__((__packed__)); + +int tfe_tcp_restore_syn_packet(struct tcp_restore_info *restore_info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size) +{ + int length = 0; + char tcp_option_buff[40] = {0}; + int tcp_option_len = 0; + + const struct tcp_restore_endpoint *client = &restore_info->client; + const struct tcp_restore_endpoint *server = &restore_info->server; + + uint32_t c_seq = client->seq - 1; + + /* + * Maximum segment size: Kind: 2, Length: 4 + * +---------+---------+---------+ + * | Kind=2 |Length=4 |mss.value| + * +---------+---------+---------+ + * 1 1 2 + */ + if (client->mss && server->mss) + { + struct tcp_option_mss *option = (struct tcp_option_mss *)(tcp_option_buff + tcp_option_len); + option->kind = 2; + option->length = 4; + option->mss_value = htons(client->mss); + tcp_option_len += sizeof(struct tcp_option_mss); + } + + /* + * Window Scale option: Kind: 3, Length: 3 + * +---------+---------+---------+ + * | Kind=3 |Length=3 |shift.cnt| + * +---------+---------+---------+ + * 1 1 1 + */ + if (client->wscale_perm && server->wscale_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 1); + tcp_option_len += 1; + + struct tcp_option_window_scale *option = (struct tcp_option_window_scale *)(tcp_option_buff + tcp_option_len); + option->kind = 3; + option->length = 3; + option->shift_count = client->wscale; + tcp_option_len += sizeof(struct tcp_option_window_scale); + } + + /* + * SACK option: Kind: 4, Length: 2 + * +---------+---------+ + * | Kind=4 |Length=2 | + * +---------+---------+ + * 1 1 + */ + if (client->sack_perm && server->sack_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 2); + tcp_option_len += 2; + + struct tcp_option_sack *option = (struct tcp_option_sack *)(tcp_option_buff + tcp_option_len); + option->kind = 4; + option->length = 2; + tcp_option_len += sizeof(struct tcp_option_sack); + } + + /* + * Time Stamp option: Kind: 8, Length: 10 + * +---------+---------+-----+-----+ + * | Kind=8 |Length=10|tsval|tsecr| + * +---------+---------+-----+-----+ + * 1 1 4 4 + */ + if (client->timestamp_perm && server->timestamp_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 2); + tcp_option_len += 2; + + struct tcp_option_time_stamp *option = (struct tcp_option_time_stamp *)(tcp_option_buff + tcp_option_len); + option->kind = 8; + option->length = 10; + option->tsval = htonl(client->ts_val); + option->tsecr = htonl(0); + tcp_option_len += sizeof(struct tcp_option_time_stamp); + } + + if (client->addr.ss_family == AF_INET6) + { + struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; + struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; + uint16_t port_client = sk_client->sin6_port; + uint16_t port_server = sk_server->sin6_port; + + // C -> S + length = tcp_packet_v6_construct( + buffer, // buffer + client_mac, server_mac, 0, ETH_P_IPV6, // Ether + &sk_client->sin6_addr, &sk_server->sin6_addr, TFE_FAKE_C_DEFAULT_TTL, // IPv6 + port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); // Payload + } + else + { + struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; + struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; + uint16_t port_client = sk_client->sin_port; + uint16_t port_server = sk_server->sin_port; + + // C -> S + length = tcp_packet_v4_construct( + buffer, // buffer + client_mac, server_mac, 0, ETH_P_IP, // Ether + &sk_client->sin_addr, &sk_server->sin_addr, 0, TFE_FAKE_C_DEFAULT_TTL, 0x11, // IPv4 + port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); + } + + return length; +} + +int tfe_tcp_restore_synack_packet(struct tcp_restore_info *restore_info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size) +{ + int length = 0; + char tcp_option_buff[40] = {0}; + int tcp_option_len = 0; + + const struct tcp_restore_endpoint *client = &restore_info->client; + const struct tcp_restore_endpoint *server = &restore_info->server; + + uint32_t c_seq = client->seq - 1; + uint32_t s_seq = server->seq - 1; + + /* + * Maximum segment size: Kind: 2, Length: 4 + * +---------+---------+---------+ + * | Kind=2 |Length=4 |mss.value| + * +---------+---------+---------+ + * 1 1 2 + */ + if (client->mss && server->mss) + { + struct tcp_option_mss *option = (struct tcp_option_mss *)(tcp_option_buff + tcp_option_len); + option->kind = 2; + option->length = 4; + option->mss_value = htons(server->mss); + tcp_option_len += sizeof(struct tcp_option_mss); + } + + /* + * Window Scale option: Kind: 3, Length: 3 + * +---------+---------+---------+ + * | Kind=3 |Length=3 |shift.cnt| + * +---------+---------+---------+ + * 1 1 1 + */ + if (client->wscale_perm && server->wscale_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 1); + tcp_option_len += 1; + + struct tcp_option_window_scale *option = (struct tcp_option_window_scale *)(tcp_option_buff + tcp_option_len); + option->kind = 3; + option->length = 3; + option->shift_count = server->wscale; + tcp_option_len += sizeof(struct tcp_option_window_scale); + } + + /* + * SACK option: Kind: 4, Length: 2 + * +---------+---------+ + * | Kind=4 |Length=2 | + * +---------+---------+ + * 1 1 + */ + if (client->sack_perm && server->sack_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 2); + tcp_option_len += 2; + + struct tcp_option_sack *option = (struct tcp_option_sack *)(tcp_option_buff + tcp_option_len); + option->kind = 4; + option->length = 2; + tcp_option_len += sizeof(struct tcp_option_sack); + } + + /* + * Time Stamp option: Kind: 8, Length: 10 + * +---------+---------+-----+-----+ + * | Kind=8 |Length=10|tsval|tsecr| + * +---------+---------+-----+-----+ + * 1 1 4 4 + */ + if (client->timestamp_perm && server->timestamp_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 2); + tcp_option_len += 2; + + struct tcp_option_time_stamp *option = (struct tcp_option_time_stamp *)(tcp_option_buff + tcp_option_len); + option->kind = 8; + option->length = 10; + option->tsval = htonl(server->ts_val); + option->tsecr = htonl(client->ts_val); + tcp_option_len += sizeof(struct tcp_option_time_stamp); + } + + if (client->addr.ss_family == AF_INET6) + { + struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; + struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; + uint16_t port_client = sk_client->sin6_port; + uint16_t port_server = sk_server->sin6_port; + + c_seq += 1; + + // S -> C + length = tcp_packet_v6_construct( + buffer, // buffer + server_mac, client_mac, 0, ETH_P_IPV6, // Ether + &sk_server->sin6_addr, &sk_client->sin6_addr, TFE_FAKE_S_DEFAULT_TTL, // IPv6 + port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); // Payload + } + else + { + struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; + struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; + uint16_t port_client = sk_client->sin_port; + uint16_t port_server = sk_server->sin_port; + + c_seq += 1; + + // S -> C + length = tcp_packet_v4_construct( + buffer, // buffer + server_mac, client_mac, 0, ETH_P_IP, // Ether + &sk_server->sin_addr, &sk_client->sin_addr, 0, TFE_FAKE_S_DEFAULT_TTL, 0x12, // IPv4 + port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); + } + + return length; +} + +int tfe_tcp_restore_ack_packet(struct tcp_restore_info *restore_info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size) +{ + int length = 0; + + char tcp_option_buff[40] = {0}; + int tcp_option_len = 0; + + const struct tcp_restore_endpoint *client = &restore_info->client; + const struct tcp_restore_endpoint *server = &restore_info->server; + + uint32_t c_seq = client->seq - 1; + uint32_t s_seq = server->seq - 1; + + /* + * Time Stamp option: Kind: 8, Length: 10 + * +---------+---------+-----+-----+ + * | Kind=8 |Length=10|tsval|tsecr| + * +---------+---------+-----+-----+ + * 1 1 4 4 + */ + if (client->timestamp_perm && server->timestamp_perm) + { + // padding + memset(tcp_option_buff + tcp_option_len, 1, 2); + tcp_option_len += 2; + + struct tcp_option_time_stamp *option = (struct tcp_option_time_stamp *)(tcp_option_buff + tcp_option_len); + option->kind = 8; + option->length = 10; + option->tsval = htonl(client->ts_val); + option->tsecr = htonl(server->ts_val); + tcp_option_len += sizeof(struct tcp_option_time_stamp); + } + + if (client->addr.ss_family == AF_INET6) + { + struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; + struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; + uint16_t port_client = sk_client->sin6_port; + uint16_t port_server = sk_server->sin6_port; + + c_seq += 1; + s_seq += 1; + + // C -> S + length = tcp_packet_v6_construct( + buffer, // buffer + client_mac, server_mac, 0, ETH_P_IPV6, // Ether + &sk_client->sin6_addr, &sk_server->sin6_addr, TFE_FAKE_C_DEFAULT_TTL, // IPv6 + port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); // Payload + } + else + { + struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; + struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; + uint16_t port_client = sk_client->sin_port; + uint16_t port_server = sk_server->sin_port; + + c_seq += 1; + s_seq += 1; + + // C -> S + length = tcp_packet_v4_construct( + buffer, // buffer + client_mac, server_mac, 0, ETH_P_IP, // Ether + &sk_client->sin_addr, &sk_server->sin_addr, 0, TFE_FAKE_C_DEFAULT_TTL, 0x13, // IPv4 + port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header + tcp_option_buff, tcp_option_len, // TCP Options + NULL, 0); + } + + return length; +} \ No newline at end of file diff --git a/common/test/CMakeLists.txt b/common/test/CMakeLists.txt index 121fc3d..3150504 100644 --- a/common/test/CMakeLists.txt +++ b/common/test/CMakeLists.txt @@ -30,6 +30,13 @@ add_executable(gtest_raw_packet test_raw_packet.cpp) target_include_directories(gtest_raw_packet PUBLIC ${CMAKE_SOURCE_DIR}/common/include) target_link_libraries(gtest_raw_packet common gtest) +############################################################################### +# test_fixed_buffer +############################################################################### + +#add_executable(test_fixed_buffer test_fixed_buffer.cpp ${CMAKE_SOURCE_DIR}/common/src/fixed_buffer.cpp) +#target_include_directories(test_fixed_buffer PUBLIC ${CMAKE_SOURCE_DIR}/common/include) + ############################################################################### # gtest_discover_tests ############################################################################### @@ -39,3 +46,4 @@ gtest_discover_tests(gtest_addr) gtest_discover_tests(gtest_cmsg) gtest_discover_tests(gtest_session_table) gtest_discover_tests(gtest_raw_packet) +#gtest_discover_tests(test_fixed_buffer) \ No newline at end of file diff --git a/common/test/test_fixed_buffer.cpp b/common/test/test_fixed_buffer.cpp new file mode 100644 index 0000000..82e7fc6 --- /dev/null +++ b/common/test/test_fixed_buffer.cpp @@ -0,0 +1,70 @@ +#include +#include + +static void show_info(const char *str) +{ + printf("********************************\n"); + printf("* %s\n", str); + printf("********************************\n\n"); +} + +int main(int argc, char **argv) +{ + int buffer_size = 2048; + int buffer_num = 3; + + // create + struct iobuffer_pool *pool = iobuffer_pool_create(buffer_size, buffer_num); + if (pool == NULL) + { + return 0; + } + + show_info("After Create"); + iobuffer_pool_print(pool); + + // pop + show_info("After Pop"); + + struct iobuffer *buffer1 = iobuffer_pool_pop(pool); + printf("pop buffer1: %p\n\n", buffer1); + iobuffer_pool_print(pool); + + struct iobuffer *buffer2 = iobuffer_pool_pop(pool); + printf("pop buffer2: %p\n\n", buffer2); + iobuffer_pool_print(pool); + + struct iobuffer *buffer3 = iobuffer_pool_pop(pool); + printf("pop buffer3: %p\n\n", buffer3); + iobuffer_pool_print(pool); + + struct iobuffer *buffer4 = iobuffer_pool_pop(pool); + printf("pop buffer4: %p\n\n", buffer4); + iobuffer_pool_print(pool); + + // push + show_info("After Push"); + iobuffer_pool_push(pool, buffer1); + printf("push buffer1: %p\n\n", buffer1); + iobuffer_pool_print(pool); + + iobuffer_pool_push(pool, buffer2); + printf("push buffer2: %p\n\n", buffer2); + iobuffer_pool_print(pool); + + iobuffer_pool_push(pool, buffer3); + printf("push buffer3: %p\n\n", buffer3); + iobuffer_pool_print(pool); + + iobuffer_pool_push(pool, buffer4); + printf("push buffer4: %p\n\n", buffer4); + iobuffer_pool_print(pool); + + // destory + iobuffer_pool_destory(pool); + pool = NULL; + show_info("After Destory"); + iobuffer_pool_print(pool); + + return 0; +} diff --git a/platform/src/acceptor_kni_v3.cpp b/platform/src/acceptor_kni_v3.cpp index ac50f3b..2011c78 100644 --- a/platform/src/acceptor_kni_v3.cpp +++ b/platform/src/acceptor_kni_v3.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include @@ -282,257 +281,6 @@ static void tcp_restore_info_parse_from_pkt(struct pkt_info *pktinfo, struct tcp } } -struct tcp_option_mss { - uint8_t kind; - uint8_t length; - uint16_t mss_value; -} __attribute__((__packed__)); - -struct tcp_option_window_scale { - uint8_t kind; - uint8_t length; - uint8_t shift_count; -} __attribute__((__packed__)); - -struct tcp_option_sack { - uint8_t kind; - uint8_t length; -} __attribute__((__packed__)); - -struct tcp_option_time_stamp { - uint8_t kind; - uint8_t length; - uint32_t tsval; - uint32_t tsecr; -} __attribute__((__packed__)); - -static int fake_tcp_handshake(struct tfe_proxy *proxy, struct tcp_restore_info *restore_info) -{ - char buffer[1500] = {0}; - int length = 0; - - char tcp_option_buffer_c[40] = {0}; - char tcp_option_buffer_s[40] = {0}; - char tcp_option_buffer_c2[40] = {0}; - int tcp_option_length_c = 0; - int tcp_option_length_s = 0; - int tcp_option_length_c2 = 0; - - const struct tcp_restore_endpoint *client = &restore_info->client; - const struct tcp_restore_endpoint *server = &restore_info->server; - struct raw_socket *raw_socket_c = raw_socket_create(proxy->traffic_steering_options.device_client, proxy->traffic_steering_options.so_mask_client); - struct raw_socket *raw_socket_s = raw_socket_create(proxy->traffic_steering_options.device_server, proxy->traffic_steering_options.so_mask_server); - if (raw_socket_c == NULL || raw_socket_s == NULL) - { - raw_socket_destory(raw_socket_c); - raw_socket_destory(raw_socket_s); - - return -1; - } - - uint32_t c_seq = client->seq - 1; - uint32_t s_seq = server->seq - 1; - - /* - * Maximum segment size: Kind: 2, Length: 4 - * +---------+---------+---------+ - * | Kind=2 |Length=4 |mss.value| - * +---------+---------+---------+ - * 1 1 2 - */ - if (client->mss && server->mss) - { - struct tcp_option_mss *option_c = (struct tcp_option_mss *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 2; - option_c->length = 4; - option_c->mss_value = htons(client->mss); - tcp_option_length_c += sizeof(struct tcp_option_mss); - - struct tcp_option_mss *option_s = (struct tcp_option_mss *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 2; - option_s->length = 4; - option_s->mss_value = htons(server->mss); - tcp_option_length_s += sizeof(struct tcp_option_mss); - } - - /* - * Window Scale option: Kind: 3, Length: 3 - * +---------+---------+---------+ - * | Kind=3 |Length=3 |shift.cnt| - * +---------+---------+---------+ - * 1 1 1 - */ - if (client->wscale_perm && server->wscale_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 1); - tcp_option_length_c += 1; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 1); - tcp_option_length_s += 1; - - struct tcp_option_window_scale *option_c = (struct tcp_option_window_scale *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 3; - option_c->length = 3; - option_c->shift_count = client->wscale; - tcp_option_length_c += sizeof(struct tcp_option_window_scale); - - struct tcp_option_window_scale *option_s = (struct tcp_option_window_scale *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 3; - option_s->length = 3; - option_s->shift_count = server->wscale; - tcp_option_length_s += sizeof(struct tcp_option_window_scale); - } - - /* - * SACK option: Kind: 4, Length: 2 - * +---------+---------+ - * | Kind=4 |Length=2 | - * +---------+---------+ - * 1 1 - */ - if (client->sack_perm && server->sack_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 2); - tcp_option_length_c += 2; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 2); - tcp_option_length_s += 2; - - struct tcp_option_sack *option_c = (struct tcp_option_sack *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 4; - option_c->length = 2; - tcp_option_length_c += sizeof(struct tcp_option_sack); - - struct tcp_option_sack *option_s = (struct tcp_option_sack *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 4; - option_s->length = 2; - tcp_option_length_s += sizeof(struct tcp_option_sack); - } - - /* - * Time Stamp option: Kind: 8, Length: 10 - * +---------+---------+-----+-----+ - * | Kind=8 |Length=10|tsval|tsecr| - * +---------+---------+-----+-----+ - * 1 1 4 4 - */ - if (client->timestamp_perm && server->timestamp_perm) - { - // padding - memset(tcp_option_buffer_c + tcp_option_length_c, 1, 2); - tcp_option_length_c += 2; - memset(tcp_option_buffer_s + tcp_option_length_s, 1, 2); - tcp_option_length_s += 2; - memset(tcp_option_buffer_c2 + tcp_option_length_c2, 1, 2); - tcp_option_length_c2 += 2; - - struct tcp_option_time_stamp *option_c = (struct tcp_option_time_stamp *)(tcp_option_buffer_c + tcp_option_length_c); - option_c->kind = 8; - option_c->length = 10; - option_c->tsval = htonl(client->ts_val); - option_c->tsecr = htonl(0); - tcp_option_length_c += sizeof(struct tcp_option_time_stamp); - - struct tcp_option_time_stamp *option_s = (struct tcp_option_time_stamp *)(tcp_option_buffer_s + tcp_option_length_s); - option_s->kind = 8; - option_s->length = 10; - option_s->tsval = htonl(server->ts_val); - option_s->tsecr = htonl(client->ts_val); - tcp_option_length_s += sizeof(struct tcp_option_time_stamp); - - struct tcp_option_time_stamp *option_c2 = (struct tcp_option_time_stamp *)(tcp_option_buffer_c2 + tcp_option_length_c2); - option_c2->kind = 8; - option_c2->length = 10; - option_c2->tsval = htonl(client->ts_val); - option_c2->tsecr = htonl(server->ts_val); - tcp_option_length_c2 += sizeof(struct tcp_option_time_stamp); - } - - if (client->addr.ss_family == AF_INET6) - { - struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; - struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; - uint16_t port_client = sk_client->sin6_port; - uint16_t port_server = sk_server->sin6_port; - - // C -> S - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_client->sin6_addr, &sk_server->sin6_addr, 55, // IPv6 - port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header - tcp_option_buffer_c, tcp_option_length_c, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_c, buffer, length); - c_seq += 1; - - // S -> C - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_s->mac_addr, &raw_socket_c->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_server->sin6_addr, &sk_client->sin6_addr, 65, // IPv6 - port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header - tcp_option_buffer_s, tcp_option_length_s, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_s, buffer, length); - s_seq += 1; - - // C -> S - length = tcp_packet_v6_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IPV6, // Ether - &sk_client->sin6_addr, &sk_server->sin6_addr, 55, // IPv6 - port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header - tcp_option_buffer_c2, tcp_option_length_c2, // TCP Options - NULL, 0); // Payload - raw_socket_send(raw_socket_c, buffer, length); - } - else - { - struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; - struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; - uint16_t port_client = sk_client->sin_port; - uint16_t port_server = sk_server->sin_port; - - // C -> S - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IP, // Ether - &sk_client->sin_addr, &sk_server->sin_addr, 0, 55, 0x11, // IPv4 - port_client, port_server, c_seq, 0, TCP_SYN_FLAG, client->window, // TCP Header - tcp_option_buffer_c, tcp_option_length_c, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_c, buffer, length); - c_seq += 1; - - // S -> C - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_s->mac_addr, &raw_socket_c->mac_addr, 0, ETH_P_IP, // Ether - &sk_server->sin_addr,&sk_client->sin_addr, 0, 65, 0x12, // IPv4 - port_server, port_client, s_seq, c_seq, TCP_SYN_FLAG | TCP_ACK_FLAG, server->window, // TCP Header - tcp_option_buffer_s, tcp_option_length_s, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_s, buffer, length); - s_seq += 1; - - // C -> S - length = tcp_packet_v4_construct( - buffer, // buffer - &raw_socket_c->mac_addr, &raw_socket_s->mac_addr, 0, ETH_P_IP, // Ether - &sk_client->sin_addr, &sk_server->sin_addr, 0, 55, 0x13, // IPv4 - port_client, port_server, c_seq, s_seq, TCP_ACK_FLAG, client->window, // TCP Header - tcp_option_buffer_c2, tcp_option_length_c2, // TCP Options - NULL, 0); - raw_socket_send(raw_socket_c, buffer, length); - } - - raw_socket_destory(raw_socket_c); - raw_socket_destory(raw_socket_s); - - return 0; -} - static int overwrite_tcp_mss(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore) { int ret = 0; @@ -588,8 +336,6 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s int ret = 0; int fd_downstream = 0; int fd_upstream = 0; - int fd_fake_c = 0; - int fd_fake_s = 0; int hit_tcpopt = 0; uint16_t cmsg_offset = 0; uint8_t restore_opt_len = 0; @@ -603,7 +349,6 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s struct pkt_info pktinfo; struct tcp_restore_info restore_info; uint8_t stream_protocol_in_char = 0; - uint8_t enalbe_decrypted_traffic_steering = 0; uint16_t size = 0; // uint64_t chaining_rule_id = 0; // only use for acceptv4 struct acceptor_kni_v3 *__ctx = (struct acceptor_kni_v3 *)data; @@ -746,33 +491,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, sizeof(stream_protocol_in_char), &size); // tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_DECRYPTED_TRAFFIC_STEERING, (unsigned char *)&enalbe_decrypted_traffic_steering, sizeof(enalbe_decrypted_traffic_steering), &size); - if (steering_device_is_available() && ( - (STREAM_PROTO_PLAIN == (enum tfe_stream_proto)stream_protocol_in_char && __ctx->proxy->traffic_steering_options.enable_steering_http) || - (STREAM_PROTO_SSL == (enum tfe_stream_proto)stream_protocol_in_char && __ctx->proxy->traffic_steering_options.enable_steering_ssl) || - enalbe_decrypted_traffic_steering == 1)) - { - if (fake_tcp_handshake(__ctx->proxy, &restore_info) == -1) - { - TFE_LOG_ERROR(g_default_logger, "Failed at fake_tcp_handshake()"); - goto end; - } - - fd_fake_c = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), __ctx->proxy->traffic_steering_options.device_client, __ctx->proxy->traffic_steering_options.so_mask_client); - if (fd_fake_c < 0) - { - TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(fd_fake_c)"); - goto end; - } - - fd_fake_s = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), __ctx->proxy->traffic_steering_options.device_server, __ctx->proxy->traffic_steering_options.so_mask_server); - if (fd_fake_s < 0) - { - TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(fd_fake_s)"); - goto end; - } - } - - if (tfe_proxy_fds_accept(__ctx->proxy, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s, cmsg) < 0) + if (tfe_proxy_fds_accept(__ctx->proxy, fd_downstream, fd_upstream, 0, 0, cmsg) < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tfe_proxy_fds_accept()"); goto end; @@ -800,14 +519,6 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s return nfq_set_verdict(qh, id, NF_ACCEPT, pktinfo.ip_totlen, raw_payload); end: - if (fd_fake_c > 0) - { - close(fd_fake_c); - } - if (fd_fake_s > 0) - { - close(fd_fake_s); - } if (fd_upstream > 0) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 1); diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index f054262..941a084 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -126,19 +126,29 @@ static void *worker_thread_cycle(void *arg) { struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; - void * logger = thread_ctx->logger; + void *logger = thread_ctx->logger; +#define MAX_REBUFF_SIZE 2048 + char buffer[MAX_REBUFF_SIZE]; int pkg_len = 0; char thread_name[16]; - int n_pkt_recv_from_nf = 0; - int n_pkt_recv_from_tap = 0; - int n_pkt_recv_from_tap_c = 0; - int n_pkt_recv_from_tap_s = 0; + int n_pkt_recv = 0; + int thread_index = thread_ctx->thread_index; + int using_iouring_mode = is_enable_iouring(handle); - snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_ctx->thread_index); + int fd_on_tap_0 = thread_ctx->tap_ctx->tap_fd; + int fd_on_tap_c = thread_ctx->tap_ctx->tap_c; + int fd_on_tap_s = thread_ctx->tap_ctx->tap_s; + + struct io_uring_instance *io_uring_on_tap_0 = thread_ctx->tap_ctx->io_uring_fd; + struct io_uring_instance *io_uring_on_tap_c = thread_ctx->tap_ctx->io_uring_c; + struct io_uring_instance *io_uring_on_tap_s = thread_ctx->tap_ctx->io_uring_s; + + snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); - while (!worker_thread_ready) { + while (!worker_thread_ready) + { sleep(1); } @@ -147,56 +157,59 @@ static void *worker_thread_cycle(void *arg) goto error_out; } - if (is_enable_iouring(handle)) { - io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_fd, handle_raw_packet_from_tap, thread_ctx); - io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_c, handle_decryption_packet_from_tap, thread_ctx); - io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_s, handle_decryption_packet_from_tap, thread_ctx); - } - else { - thread_ctx->tap_ctx->buff_size = 3000; - thread_ctx->tap_ctx->buff = ALLOC(char, thread_ctx->tap_ctx->buff_size); + if (using_iouring_mode) + { + io_uring_set_read_cb(io_uring_on_tap_0, handle_raw_packet_from_tap, thread_ctx); + io_uring_set_read_cb(io_uring_on_tap_c, handle_decryption_packet_from_tap, thread_ctx); + io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx); } - TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_ctx->thread_index); + TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index); - while(1) { - n_pkt_recv_from_nf = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); - if (is_enable_iouring(handle)) { - n_pkt_recv_from_tap = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_fd); - n_pkt_recv_from_tap_c = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_c); - n_pkt_recv_from_tap_s = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_s); + while (1) + { + n_pkt_recv = packet_io_polling_nf_interface(handle, thread_index, thread_ctx); + if (using_iouring_mode) + { + n_pkt_recv += io_uring_polling(io_uring_on_tap_0); + n_pkt_recv += io_uring_polling(io_uring_on_tap_c); + n_pkt_recv += io_uring_polling(io_uring_on_tap_s); } - else { - if ((pkg_len = tap_read(thread_ctx->tap_ctx->tap_fd, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, logger)) > 0) + else + { + if ((pkg_len = tap_read(fd_on_tap_0, buffer, MAX_REBUFF_SIZE, logger)) > 0) { - handle_raw_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); + n_pkt_recv++; + handle_raw_packet_from_tap(buffer, pkg_len, thread_ctx); } - if ((pkg_len = tap_read(thread_ctx->tap_ctx->tap_c, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, logger)) > 0) + if ((pkg_len = tap_read(fd_on_tap_c, buffer, MAX_REBUFF_SIZE, logger)) > 0) { - handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); + n_pkt_recv++; + handle_decryption_packet_from_tap(buffer, pkg_len, thread_ctx); } - if ((pkg_len = tap_read(thread_ctx->tap_ctx->tap_s, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, logger)) > 0) + if ((pkg_len = tap_read(fd_on_tap_s, buffer, MAX_REBUFF_SIZE, logger)) > 0) { - handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); + n_pkt_recv++; + handle_decryption_packet_from_tap(buffer, pkg_len, thread_ctx); } } - if (n_pkt_recv_from_nf == 0 && n_pkt_recv_from_tap == 0 && n_pkt_recv_from_tap_c == 0 && n_pkt_recv_from_tap_s == 0) + if (n_pkt_recv == 0) { packet_io_thread_wait(handle, thread_ctx, -1); } - if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0) + if (ATOMIC_READ(&thread_ctx->session_table_need_reset) > 0) { session_table_reset(thread_ctx->session_table); - __atomic_fetch_and(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED); + ATOMIC_ZERO(&thread_ctx->session_table_need_reset); } } error_out: - TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index); + TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", LOG_TAG_SCE, thread_index); return (void *)NULL; } diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 60db7cb..46fb62a 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -1320,7 +1320,7 @@ static void ssl_server_connected_eventcb(struct bufferevent * bev, short events, jiffies_ms=(ctx->end.tv_sec-ctx->start.tv_sec)*1000+(ctx->end.tv_nsec-ctx->start.tv_nsec)/1000000; if(jiffies_ms>LATENCY_WARNING_THRESHOLD_MS) { - TFE_LOG_ERROR(mgr->logger, "Warning: ssl connect server latency %ld ms: addr=%s, sni=%s", + TFE_LOG_INFO(mgr->logger, "Warning: ssl connect server latency %ld ms: addr=%s, sni=%s", jiffies_ms, s_stream->tcp_stream->str_stream_info, s_upstream->client_hello->sni); @@ -1948,7 +1948,7 @@ static void ssl_client_connected_eventcb(struct bufferevent * bev, short events, jiffies_ms=(ctx->end.tv_sec-ctx->start.tv_sec)*1000+(ctx->end.tv_nsec-ctx->start.tv_nsec)/1000000; if(jiffies_ms>LATENCY_WARNING_THRESHOLD_MS) { - TFE_LOG_ERROR(mgr->logger, "Warning: ssl connect client latency %ld ms: addr=%s, sni=%s", + TFE_LOG_INFO(mgr->logger, "Warning: ssl connect client latency %ld ms: addr=%s, sni=%s", jiffies_ms, s_stream->tcp_stream->str_stream_info, s_upstream->client_hello->sni); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index be895a0..439908b 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1603,7 +1603,7 @@ static void get_tcp_option_from_cmsg(struct tfe_cmsg *cmsg, struct tfe_tcp_optio } } -void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir) +void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir, int overwrite_ttl) { struct tfe_stream * stream = &_stream->head; struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options; @@ -1720,21 +1720,31 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket errno = 0; } - if (options.tcp_ttl > 0) + if (overwrite_ttl > 0) { - if (__fd_ttl_option_setup(_stream, fd, options.tcp_ttl) < 0) + if (__fd_ttl_option_setup(_stream, fd, overwrite_ttl) < 0) { TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", - stream->str_stream_info, options.tcp_ttl, fd); + stream->str_stream_info, overwrite_ttl, fd); + } + } + else + { + if (options.tcp_ttl > 0) + { + if (__fd_ttl_option_setup(_stream, fd, options.tcp_ttl) < 0) + { + TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", + stream->str_stream_info, options.tcp_ttl, fd); + } } } TFE_LOG_DEBUG(g_default_logger, - "%p %s %s: fetch tcp options, nodelay: %d ttl: %d keepalive: %d keepcnt: %d keepidle: %d keepintvl: %d user_timeout: %d", - stream, stream->str_stream_info, (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"), - options.tcp_nodelay, options.tcp_ttl, options.tcp_keepalive, - options.tcp_keepcnt, options.tcp_keepidle, options.tcp_keepintvl, options.tcp_user_timeout); - + "%p %s %s: fetch tcp options, nodelay: %d ttl: %d keepalive: %d keepcnt: %d keepidle: %d keepintvl: %d user_timeout: %d", + stream, stream->str_stream_info, (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"), + options.tcp_nodelay, overwrite_ttl > 0 ? overwrite_ttl : options.tcp_ttl, options.tcp_keepalive, + options.tcp_keepcnt, options.tcp_keepidle, options.tcp_keepintvl, options.tcp_user_timeout); } int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, evutil_socket_t fd_fake_c, evutil_socket_t fd_fake_s) @@ -1763,13 +1773,13 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); stream->str_stream_info = _stream->str_stream_addr; - __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM); - __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM); + __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM, 0); + __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM, 0); if (_stream->is_decrypted_traffic_steering) { - __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_UPSTREAM); - __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_DOWNSTREAM); + __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_UPSTREAM, TFE_FAKE_S_DEFAULT_TTL); + __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_DOWNSTREAM, TFE_FAKE_C_DEFAULT_TTL); _stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s); if (_stream->conn_fake_s == NULL)