TSG-14938 TFE支持新控制报文格式; 调整代码结构

This commit is contained in:
wangmenglan
2023-04-28 16:18:32 +08:00
parent 8a7c196c20
commit 8de8ec1c5f
22 changed files with 16372 additions and 1069 deletions

View File

@@ -3,13 +3,14 @@
src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp src/tfe_resource.cpp src/tfe_scan.cpp
src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp src/raw_socket.cpp src/packet_construct.cpp
src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp src/tfe_fieldstat.cpp
src/tfe_addr_tuple4.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp src/tfe_timestamp.cpp
src/tfe_acceptor_kni.cpp src/tfe_ctrl_packet.cpp src/tfe_raw_packet.cpp
src/tfe_mpack.cpp src/mpack.cpp src/tfe_tap_rss.cpp src/tfe_metrics.cpp)
src/tfe_addr_tuple4.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp
src/tfe_ctrl_packet.cpp src/tfe_raw_packet.cpp
src/tfe_mpack.cpp src/tfe_metrics.cpp src/mpack.cpp)
target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/)
target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal)
target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads rdkafka)
target_link_libraries(common PUBLIC MESA_handle_logger cjson msgpack)
target_link_libraries(common PUBLIC MESA_handle_logger cjson bpf_obj)
target_link_libraries(common PUBLIC pthread)
if (SUPPORT_LIBURING)

8207
common/include/mpack.h Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,8 @@ void tap_close(int sockfd);
int tap_up_link(const char *eth);
int tap_get_mtu(const char *eth);
int tap_set_rps(const char *eth, int thread_index, const char *rps_mask);
#ifdef __cpluscplus
}
#endif

View File

@@ -1,147 +0,0 @@
#ifndef _TFE_ACCEPTOR_KNI_H
#define _TFE_ACCEPTOR_KNI_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <sched.h>
#include <pthread.h>
// #include "proxy.h"
#include "tfe_utils.h"
#include "tfe_timestamp.h"
#include "tfe_packet_io.h"
#include "tfe_session_table.h"
/******************************************************************************
* Struct For tap
******************************************************************************/
struct tap_config
{
int enable_iouring;
int enable_debuglog;
int ring_size;
int buff_size;
int flags;
int sq_thread_idle;
char src_mac[6];
char tap_mac[6];
char tap_c_mac[6];
char tap_s_mac[6];
char tap_device[16];
char tap_c_device[16];
char tap_s_device[16];
int tap_rps_enable;
char tap_rps_mask[TFE_SYMBOL_MAX];
struct bpf_ctx *tap_bpf_ctx;
};
struct tap_ctx
{
int tap_s;
int tap_c;
int tap_fd;
struct io_uring_instance *io_uring_fd;
struct io_uring_instance *io_uring_c;
struct io_uring_instance *io_uring_s;
int buff_size;
char *buff;
};
/******************************************************************************
* Struct For Thread
******************************************************************************/
struct acceptor_thread_ctx
{
pthread_t tid;
int thread_index;
struct tap_ctx *tap_ctx;
struct session_table *session_table;
struct sf_metrics *sf_metrics;
struct tap_config *ref_tap_config;
struct packet_io *ref_io;
struct global_metrics *ref_metrics;
struct policy_enforcer *ref_enforcer;
struct acceptor_ctx *ref_acceptor_ctx;
struct tfe_proxy *ref_proxy;
int session_table_need_reset;
};
/******************************************************************************
* Struct For Session
******************************************************************************/
struct packet_info
{
int is_e2i_dir;
struct addr_tuple4 tuple4;
char *header_data;
int header_len;
};
struct session_ctx
{
int policy_ids;
uint64_t session_id;
char *session_addr;
char client_mac[6];
char server_mac[6];
struct packet_info c2s_info;
struct packet_info s2c_info;
struct metadata *raw_meta_i2e;
struct metadata *raw_meta_e2i;
struct metadata *ctrl_meta;
struct tfe_cmsg *cmsg;
struct acceptor_thread_ctx *ref_thread_ctx;
};
struct session_ctx *session_ctx_new();
void session_ctx_free(struct session_ctx *ctx);
/******************************************************************************
* Struct For KNI
******************************************************************************/
struct acceptor_ctx
{
int firewall_sids;
int sce_sids;
int nr_worker_threads;
int cpu_affinity_mask[TFE_THREAD_MAX];
cpu_set_t coremask;
struct tap_config *config;
struct packet_io *io;
struct global_metrics *metrics;
struct acceptor_thread_ctx work_threads[TFE_THREAD_MAX];
struct tfe_proxy *ref_proxy;
};
struct acceptor_ctx *acceptor_ctx_create(const char *profile);
void acceptor_ctx_destory(struct acceptor_ctx *ctx);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -6,15 +6,104 @@ extern "C"
{
#endif
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask);
void packet_io_destory(struct packet_io *handle);
#include "tfe_addr_tuple4.h"
struct tap_ctx
{
int tap_s;
int tap_c;
int tap_fd;
int eventfd;
int eventfd_c;
int eventfd_s;
struct io_uring_instance *io_uring_fd;
struct io_uring_instance *io_uring_c;
struct io_uring_instance *io_uring_s;
int buff_size;
char *buff;
};
struct acceptor_thread_ctx
{
pthread_t tid;
int thread_index;
struct tap_ctx *tap_ctx;
struct session_table *session_table;
struct sf_metrics *sf_metrics;
struct packet_io *ref_io;
struct global_metrics *ref_metrics;
struct policy_enforcer *ref_enforcer;
struct tfe_proxy *ref_proxy;
struct acceptor_kni_v4 *ref_acceptor_ctx;
int session_table_need_reset;
};
struct packet_info
{
int is_e2i_dir;
struct addr_tuple4 tuple4;
char *header_data;
int header_len;
};
struct session_ctx
{
int policy_ids;
uint64_t session_id;
char *session_addr;
char client_mac[6];
char server_mac[6];
struct packet_info c2s_info;
struct packet_info s2c_info;
struct metadata *raw_meta_i2e;
struct metadata *raw_meta_e2i;
struct metadata *ctrl_meta;
struct tfe_cmsg *cmsg;
struct acceptor_thread_ctx *ref_thread_ctx;
};
struct acceptor_kni_v4
{
int firewall_sids;
int proxy_sids;
int sce_sids;
int nr_worker_threads;
int cpu_affinity_mask[TFE_THREAD_MAX];
cpu_set_t coremask;
struct packet_io *io;
struct global_metrics *metrics;
struct acceptor_thread_ctx work_threads[TFE_THREAD_MAX];
struct tfe_proxy *ref_proxy;
};
int is_enable_iouring(struct packet_io *handle);
void tfe_tap_ctx_destory(struct tap_ctx *handler);
struct tap_ctx *tfe_tap_ctx_create(void *ctx);
int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx);
void packet_io_thread_wait(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx, int timeout_ms);
void packet_io_destory(struct packet_io *handle);
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask);
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx);
void handle_raw_packet_from_tap(const char *data, int len, void *args);
void handle_decryption_packet_from_tap(const char *data, int len, void *args);
void handle_raw_packet_from_tap(const char *data, int len, void *args);
#ifdef __cpluscplus
}

View File

@@ -1,35 +0,0 @@
#ifndef _TFE_TAP_RSS_H_
#define _TFE_TAP_RSS_H_
#ifdef __cplusplus
extern "C"
{
#endif
#define TAP_RSS_LOG_TAG "TAP_RSS: "
struct bpf_ctx;
int tfe_tap_get_bpf_prog_fd(struct bpf_ctx *ctx);
struct bpf_ctx *tfe_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t bpf_queue_num, uint32_t bpf_hash_mode, uint32_t bpf_debug_log, void *logger);
void tfe_tap_global_unload_rss_bpf(struct bpf_ctx *ctx);
struct tap_ctx *tfe_tap_ctx_create(void *ctx);
struct tap_config *tfe_tap_config_create(const char *profile, int thread_num);
void tfe_tap_destory(struct tap_config *tap);
int tfe_tap_set_rps(void *local_logger, const char *tap_name, int thread_num, const char *rps_mask);
int tfe_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd, void *logger);
void tfe_tap_close_per_thread(int tap_fd);
int tfe_tap_read_per_thread(int tap_fd, char *buff, int buff_size, void *logger);
int tfe_tap_write_per_thread(int tap_fd, const char *data, int data_len, void *logger);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -1,24 +0,0 @@
#ifndef _TFE_TIMESTAMP_H
#define _TFE_TIMESTAMP_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <stdint.h>
struct timestamp *timestamp_new(uint64_t update_interval_ms);
void timestamp_free(struct timestamp *ts);
void timestamp_update(struct timestamp *ts);
uint64_t timestamp_update_interval_ms(struct timestamp *ts);
uint64_t timestamp_get_sec(struct timestamp *ts);
uint64_t timestamp_get_msec(struct timestamp *ts);
#ifdef __cpluscplus
}
#endif
#endif

7304
common/src/mpack.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -143,4 +143,23 @@ int tap_get_mtu(const char *eth)
close(sockfd);
return ifr.ifr_mtu;
}
int tap_set_rps(const char *eth, int thread_index, const char *rps_mask)
{
char file[1024] = {0};
snprintf(file, sizeof(file), "/sys/class/net/%s/queues/rx-%d/rps_cpus", eth, thread_index);
FILE *fp = fopen(file, "w");
if (fp == NULL)
{
TFE_LOG_ERROR(g_default_logger, "unable to set rps on %s, file %s, %s", eth, file, strerror(errno));
return -1;
}
fwrite(rps_mask, strlen(rps_mask), 1, fp);
TFE_LOG_DEBUG(g_default_logger, "set rps '%s' to %s", rps_mask, file);
fclose(fp);
return 0;
}

View File

@@ -1,91 +0,0 @@
#include <assert.h>
#include <MESA/MESA_prof_load.h>
#include "tfe_cmsg.h"
#include "tfe_tap_rss.h"
#include "tfe_acceptor_kni.h"
#include "tfe_metrics.h"
/******************************************************************************
* session_ctx
******************************************************************************/
struct session_ctx *session_ctx_new()
{
struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx));
assert(ctx != NULL);
return ctx;
}
void session_ctx_free(struct session_ctx *ctx)
{
if (ctx)
{
if (ctx->cmsg)
{
tfe_cmsg_destroy(ctx->cmsg);
}
free(ctx);
ctx = 0;
}
}
/******************************************************************************
* acceptor_ctx
******************************************************************************/
struct acceptor_ctx *acceptor_ctx_create(const char *profile)
{
struct acceptor_ctx *ctx = ALLOC(struct acceptor_ctx, 1);
MESA_load_profile_int_def(profile, "system", "firewall_sids", (int *)&(ctx->firewall_sids), 1001);
MESA_load_profile_int_def(profile, "system", "service_chaining_sids", (int *)&(ctx->sce_sids), 1002);
MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(ctx->nr_worker_threads), 8);
MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", TFE_THREAD_MAX, (unsigned int *)ctx->cpu_affinity_mask);
ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, TFE_THREAD_MAX);
CPU_ZERO(&ctx->coremask);
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
int cpu_id = ctx->cpu_affinity_mask[i];
CPU_SET(cpu_id, &ctx->coremask);
}
ctx->io = packet_io_create(profile, ctx->nr_worker_threads, &ctx->coremask);
if (ctx->io == NULL)
{
goto error_out;
}
ctx->config = tfe_tap_config_create(profile, ctx->nr_worker_threads);
if (ctx->config == NULL)
{
goto error_out;
}
ctx->metrics = global_metrics_create();
if (ctx->metrics == NULL)
{
goto error_out;
}
return ctx;
error_out:
acceptor_ctx_destory(ctx);
return NULL;
}
void acceptor_ctx_destory(struct acceptor_ctx * ctx)
{
if (ctx)
{
packet_io_destory(ctx->io);
tfe_tap_destory(ctx->config);
global_metrics_destory(ctx->metrics);
free(ctx);
ctx = NULL;
}
return;
}

View File

@@ -39,7 +39,6 @@ int ctrl_packet_parser_parse(struct ctrl_pkt_parser *handler, const char *data,
void ctrl_packet_parser_dump(struct ctrl_pkt_parser *handler)
{
uint16_t cmsg_len;
if (handler)
{
TFE_LOG_INFO(g_default_logger, "%s: tsync : %s", LOG_TAG_POLICY, handler->tsync);

View File

@@ -4,8 +4,8 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <msgpack.h>
#include "mpack.h"
#include "tfe_cmsg.h"
#include "tfe_utils.h"
#include "tfe_ctrl_packet.h"
@@ -37,133 +37,139 @@ struct mpack_mmap_id2type
{
int id;
enum tfe_cmsg_tlv_type type;
char *str_name;
const char *str_name;
int size;
int array_index;
}mpack_table[] = {
{.id = 0, .type = TFE_CMSG_POLICY_ID, .str_name = "TFE_CMSG_POLICY_ID", .size = 8},
{.id = 1, .type = TFE_CMSG_TCP_RESTORE_SEQ, .str_name = "TFE_CMSG_TCP_RESTORE_SEQ", .size = 4},
{.id = 2, .type = TFE_CMSG_TCP_RESTORE_ACK, .str_name = "TFE_CMSG_TCP_RESTORE_ACK", .size = 4},
{.id = 3, .type = TFE_CMSG_TCP_RESTORE_MSS_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_MSS_CLIENT", .size = 2},
{.id = 4, .type = TFE_CMSG_TCP_RESTORE_MSS_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_MSS_SERVER", .size = 2},
{.id = 5, .type = TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT", .size = 1},
{.id = 6, .type = TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_WSACLE_SERVER", .size = 1},
{.id = 7, .type = TFE_CMSG_TCP_RESTORE_SACK_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_SACK_CLIENT", .size = 1},
{.id = 8, .type = TFE_CMSG_TCP_RESTORE_SACK_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_SACK_SERVER", .size = 1},
{.id = 9, .type = TFE_CMSG_TCP_RESTORE_TS_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_TS_CLIENT", .size = 1},
{.id = 10, .type = TFE_CMSG_TCP_RESTORE_TS_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_TS_SERVER", .size = 1},
{.id = 11, .type = TFE_CMSG_TCP_RESTORE_PROTOCOL, .str_name = "TFE_CMSG_TCP_RESTORE_PROTOCOL", .size = 1},
{.id = 12, .type = TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT", .size = 2},
{.id = 13, .type = TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_WINDOW_SERVER", .size = 2},
{.id = 14, .type = TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL, .str_name = "TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL", .size = 4},
{.id = 15, .type = TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL, .str_name = "TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL", .size = 4},
{.id = 16, .type = TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR, .str_name = "TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR", .size = 1},
{.id = 17, .type = TFE_CMSG_SRC_SUB_ID, .str_name = "TFE_CMSG_SRC_SUB_ID", .size = 256},
{.id = 18, .type = TFE_CMSG_DST_SUB_ID, .str_name = "TFE_CMSG_DST_SUB_ID", .size = 256},
{.id = 19, .type = TFE_CMSG_SRC_ASN, .str_name = "TFE_CMSG_SRC_ASN", .size = 64},
{.id = 20, .type = TFE_CMSG_DST_ASN, .str_name = "TFE_CMSG_DST_ASN", .size = 64},
{.id = 21, .type = TFE_CMSG_SRC_ORGANIZATION, .str_name = "TFE_CMSG_SRC_ORGANIZATION", .size = 256},
{.id = 22, .type = TFE_CMSG_DST_ORGANIZATION, .str_name = "TFE_CMSG_DST_ORGANIZATION", .size = 256},
{.id = 23, .type = TFE_CMSG_SRC_IP_LOCATION_COUNTRY, .str_name = "TFE_CMSG_SRC_IP_LOCATION_COUNTRY", .size = 256},
{.id = 24, .type = TFE_CMSG_DST_IP_LOCATION_COUNTRY, .str_name = "TFE_CMSG_DST_IP_LOCATION_COUNTRY", .size = 256},
{.id = 25, .type = TFE_CMSG_SRC_IP_LOCATION_PROVINE, .str_name = "TFE_CMSG_SRC_IP_LOCATION_PROVINE", .size = 256},
{.id = 26, .type = TFE_CMSG_DST_IP_LOCATION_PROVINE, .str_name = "TFE_CMSG_DST_IP_LOCATION_PROVINE", .size = 256},
{.id = 27, .type = TFE_CMSG_SRC_IP_LOCATION_CITY, .str_name = "TFE_CMSG_SRC_IP_LOCATION_CITY", .size = 256},
{.id = 28, .type = TFE_CMSG_DST_IP_LOCATION_CITY, .str_name = "TFE_CMSG_DST_IP_LOCATION_CITY", .size = 256},
{.id = 29, .type = TFE_CMSG_SRC_IP_LOCATION_SUBDIVISION, .str_name = "TFE_CMSG_SRC_IP_LOCATION_SUBDIVISION", .size = 256},
{.id = 30, .type = TFE_CMSG_DST_IP_LOCATION_SUBDIVISION, .str_name = "TFE_CMSG_DST_IP_LOCATION_SUBDIVISION", .size = 256},
{.id = 31, .type = TFE_CMSG_SSL_CLIENT_JA3_FINGERPRINT, .str_name = "TFE_CMSG_SSL_CLIENT_JA3_FINGERPRINT", .size = 32},
{.id = 32, .type = TFE_CMSG_FQDN_CAT_ID_VAL, .str_name = "TFE_CMSG_FQDN_CAT_ID_VAL", .size = 4, .array_index = MPACK_ARRAY_FQDN_IDS},
{.id = 33, .str_name = "TFE_SEQ_SIDS", .size = 2, .array_index = MPACK_ARRAY_SEQ_SIDS},
{.id = 34, .str_name = "TFE_ACK_SIDS", .size = 2, .array_index = MPACK_ARRAY_ACK_SIDS},
{.id = 35, .str_name = "TFE_SEQ_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_SEQ_ROUTE_CTX},
{.id = 36, .str_name = "TFE_ACK_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_ACK_ROUTE_CTX}
{.id = 0, .type = TFE_CMSG_TCP_RESTORE_SEQ, .str_name = "TFE_CMSG_TCP_RESTORE_SEQ", .size = 4},
{.id = 1, .type = TFE_CMSG_TCP_RESTORE_ACK, .str_name = "TFE_CMSG_TCP_RESTORE_ACK", .size = 4},
{.id = 2, .type = TFE_CMSG_TCP_RESTORE_MSS_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_MSS_CLIENT", .size = 2},
{.id = 3, .type = TFE_CMSG_TCP_RESTORE_MSS_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_MSS_SERVER", .size = 2},
{.id = 4, .type = TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT", .size = 1},
{.id = 5, .type = TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_WSACLE_SERVER", .size = 1},
{.id = 6, .type = TFE_CMSG_TCP_RESTORE_SACK_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_SACK_CLIENT", .size = 1},
{.id = 7, .type = TFE_CMSG_TCP_RESTORE_SACK_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_SACK_SERVER", .size = 1},
{.id = 8, .type = TFE_CMSG_TCP_RESTORE_TS_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_TS_CLIENT", .size = 1},
{.id = 9, .type = TFE_CMSG_TCP_RESTORE_TS_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_TS_SERVER", .size = 1},
{.id = 10, .type = TFE_CMSG_TCP_RESTORE_PROTOCOL, .str_name = "TFE_CMSG_TCP_RESTORE_PROTOCOL", .size = 1},
{.id = 11, .type = TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, .str_name = "TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT", .size = 2},
{.id = 12, .type = TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, .str_name = "TFE_CMSG_TCP_RESTORE_WINDOW_SERVER", .size = 2},
{.id = 13, .type = TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL, .str_name = "TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL", .size = 4},
{.id = 14, .type = TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL, .str_name = "TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL", .size = 4},
{.id = 15, .type = TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR, .str_name = "TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR", .size = 1},
{.id = 16, .type = TFE_CMSG_SRC_SUB_ID, .str_name = "TFE_CMSG_SRC_SUB_ID", .size = 256},
{.id = 17, .type = TFE_CMSG_DST_SUB_ID, .str_name = "TFE_CMSG_DST_SUB_ID", .size = 256},
{.id = 18, .type = TFE_CMSG_SRC_ASN, .str_name = "TFE_CMSG_SRC_ASN", .size = 64},
{.id = 19, .type = TFE_CMSG_DST_ASN, .str_name = "TFE_CMSG_DST_ASN", .size = 64},
{.id = 20, .type = TFE_CMSG_SRC_ORGANIZATION, .str_name = "TFE_CMSG_SRC_ORGANIZATION", .size = 256},
{.id = 21, .type = TFE_CMSG_DST_ORGANIZATION, .str_name = "TFE_CMSG_DST_ORGANIZATION", .size = 256},
{.id = 22, .type = TFE_CMSG_SRC_IP_LOCATION_COUNTRY, .str_name = "TFE_CMSG_SRC_IP_LOCATION_COUNTRY", .size = 256},
{.id = 23, .type = TFE_CMSG_DST_IP_LOCATION_COUNTRY, .str_name = "TFE_CMSG_DST_IP_LOCATION_COUNTRY", .size = 256},
{.id = 24, .type = TFE_CMSG_SRC_IP_LOCATION_PROVINE, .str_name = "TFE_CMSG_SRC_IP_LOCATION_PROVINE", .size = 256},
{.id = 25, .type = TFE_CMSG_DST_IP_LOCATION_PROVINE, .str_name = "TFE_CMSG_DST_IP_LOCATION_PROVINE", .size = 256},
{.id = 26, .type = TFE_CMSG_SRC_IP_LOCATION_CITY, .str_name = "TFE_CMSG_SRC_IP_LOCATION_CITY", .size = 256},
{.id = 27, .type = TFE_CMSG_DST_IP_LOCATION_CITY, .str_name = "TFE_CMSG_DST_IP_LOCATION_CITY", .size = 256},
{.id = 28, .type = TFE_CMSG_SRC_IP_LOCATION_SUBDIVISION, .str_name = "TFE_CMSG_SRC_IP_LOCATION_SUBDIVISION", .size = 256},
{.id = 29, .type = TFE_CMSG_DST_IP_LOCATION_SUBDIVISION, .str_name = "TFE_CMSG_DST_IP_LOCATION_SUBDIVISION", .size = 256},
{.id = 30, .type = TFE_CMSG_SSL_CLIENT_JA3_FINGERPRINT, .str_name = "TFE_CMSG_SSL_CLIENT_JA3_FINGERPRINT", .size = 32},
{.id = 31, .type = TFE_CMSG_FQDN_CAT_ID_VAL, .str_name = "TFE_CMSG_FQDN_CAT_ID_VAL", .size = 4, .array_index = MPACK_ARRAY_FQDN_IDS},
{.id = 32, .str_name = "TFE_SEQ_SIDS", .size = 2, .array_index = MPACK_ARRAY_SEQ_SIDS},
{.id = 33, .str_name = "TFE_ACK_SIDS", .size = 2, .array_index = MPACK_ARRAY_ACK_SIDS},
{.id = 34, .str_name = "TFE_SEQ_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_SEQ_ROUTE_CTX},
{.id = 35, .str_name = "TFE_ACK_ROUTE_CTX", .size = 1, .array_index = MPACK_ARRAY_ACK_ROUTE_CTX}
};
static void fqdn_id_set_cmsg(struct ctrl_pkt_parser *handler, msgpack_object *ptr, int table_index)
static int fqdn_id_set_cmsg(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index)
{
uint32_t fqdn_val[8] = {0};
tfe_cmsg_set(handler->cmsg, TFE_CMSG_FQDN_CAT_ID_NUM, (const unsigned char *)&ptr->via.array.size, sizeof(uint32_t));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array fqdn_id num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, ptr->via.array.size);
for (uint32_t j = 0; j < ptr->via.array.size; j++) {
fqdn_val[j] = ptr->via.array.ptr[j].via.u64;
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array fqdn_id msgpack cmsg: [%s] -> [%lu]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[table_index].str_name, ptr->via.array.ptr[j].via.u64);
uint32_t array_cnt = mpack_node_array_length(node);
tfe_cmsg_set(handler->cmsg, TFE_CMSG_FQDN_CAT_ID_NUM, (const unsigned char *)&array_cnt, sizeof(uint32_t));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array fqdn_id num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, array_cnt);
for (uint32_t i = 0; i < array_cnt; i++) {
fqdn_val[i] = mpack_node_u32(mpack_node_array_at(node, i));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array fqdn_id msgpack cmsg: [%s] -> [%lu]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[table_index].str_name, fqdn_val[i]);
}
tfe_cmsg_set(handler->cmsg ,TFE_CMSG_FQDN_CAT_ID_VAL, (const unsigned char*)fqdn_val, ptr->via.array.size * sizeof(uint32_t));
return;
tfe_cmsg_set(handler->cmsg, TFE_CMSG_FQDN_CAT_ID_VAL, (const unsigned char*)fqdn_val, array_cnt * sizeof(uint32_t));
return 0;
}
static void sids_array_parse_mpack(struct ctrl_pkt_parser *handler, msgpack_object *ptr, int table_index, int is_seq)
static int sids_array_parse_mpack(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index, int is_seq)
{
struct sids *sid= is_seq ? &handler->seq_sids : &handler->ack_sids;
struct sids *sid = is_seq ? &handler->seq_sids : &handler->ack_sids;
if (mpack_node_array_length(node) > MR_SID_LIST_MAXLEN) {
TFE_LOG_ERROR(g_default_logger, "%s: session: %lu sid[%u] more than maxlen %d", LOG_TAG_CTRLPKT, handler->session_id, mpack_node_array_length(node), MR_SID_LIST_MAXLEN);
return -1;
}
sid->num = ptr->via.array.size > MR_SID_LIST_MAXLEN ? MR_SID_LIST_MAXLEN : ptr->via.array.size;
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array sids num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, ptr->via.array.size);
sid->num = mpack_node_array_length(node);
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array sids num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, sid->num);
for (int i = 0; i < sid->num; i++)
{
sid->elems[i] = ptr->via.array.ptr[i].via.u64;
sid->elems[i] = mpack_node_u16(mpack_node_array_at(node, i));
}
return;
return 0;
}
static void route_ctx_parse_mpack(struct ctrl_pkt_parser *handler, msgpack_object *ptr, int table_index, int is_seq)
static int route_ctx_parse_mpack(struct ctrl_pkt_parser *handler, mpack_node_t node, int table_index, int is_seq)
{
struct route_ctx *ctx = is_seq ? &handler->seq_route_ctx : &handler->ack_route_ctx;
if (mpack_node_array_length(node) > 64) {
TFE_LOG_ERROR(g_default_logger, "%s: session: %lu route ctx[%d] more than maxlen 64", LOG_TAG_CTRLPKT, handler->session_id, mpack_node_array_length(node));
return -1;
}
ctx->len = ptr->via.array.size > 64 ? 64 : ptr->via.array.size;
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array route ctx num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, ptr->via.array.size);
ctx->len = mpack_node_array_length(node);
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu array route ctx num: [%u]", LOG_TAG_CTRLPKT, handler->session_id, ctx->len);
for (int i = 0; i < ctx->len; i++)
{
memcpy(ctx->data+i, &ptr->via.array.ptr[i].via.u64, 1);
ctx->data[i] = mpack_node_u8(mpack_node_array_at(node, i));
}
return;
return 0;
}
static int proxy_parse_messagepack(msgpack_object *obj, void *ctx)
static int proxy_parse_messagepack(mpack_node_t node, void *ctx)
{
uint64_t value = 0;
struct ctrl_pkt_parser *handler = (struct ctrl_pkt_parser *)ctx;
for (unsigned int i = 0; i < obj->via.array.size; i++) {
msgpack_object *ptr = &obj->via.array.ptr[i];
if (mpack_node_is_nil(mpack_node_map_cstr(node, "rule_ids")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (rule_ids no found)", LOG_TAG_CTRLPKT);
return -1;
}
handler->tfe_policy_id_num = mpack_node_array_length(mpack_node_map_cstr(node, "rule_ids"));
for (int i = 0; i < handler->tfe_policy_id_num; i++) {
handler->tfe_policy_ids[i] = mpack_node_u64(mpack_node_array_at(mpack_node_map_cstr(node, "rule_ids"), i));
}
if (i == 0) {
if (ptr->type == MSGPACK_OBJECT_ARRAY) {
handler->tfe_policy_id_num = ptr->via.array.size;
for (uint32_t j = 0; j < ptr->via.array.size; j++) {
handler->tfe_policy_ids[j] = ptr->via.array.ptr[j].via.u64;
}
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)&handler->tfe_policy_ids[0], sizeof(uint64_t));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu interger msgpack cmsg: [%s] num: [%d]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name, handler->tfe_policy_id_num);
for (int j = 0; j < handler->tfe_policy_id_num; j++) {
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu policy id:%lu ", LOG_TAG_CTRLPKT, handler->session_id, handler->tfe_policy_ids[j]);
}
}
continue;
if (handler->tfe_policy_id_num) {
tfe_cmsg_set(handler->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&handler->tfe_policy_ids[0], sizeof(uint64_t));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu tfe policy id num: [%d]", LOG_TAG_CTRLPKT, handler->session_id, handler->tfe_policy_id_num);
for (int i = 0; i < handler->tfe_policy_id_num; i++) {
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu policy id:%lu ", LOG_TAG_CTRLPKT, handler->session_id, handler->tfe_policy_ids[i]);
}
}
switch (ptr->type) {
case MSGPACK_OBJECT_POSITIVE_INTEGER:
// TFE_CMSG_TCP_RESTORE_PROTOCOL tsg master 发送数据错误临时强制设置为1
if (i == 11)
{
uint8_t protocol = 1;
tfe_cmsg_set(handler->cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char *)&protocol, 1);
}
else
{
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)&ptr->via.u64, mpack_table[i].size);
}
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu interger msgpack cmsg: [%s] -> [%lu]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name, ptr->via.u64);
mpack_node_t tcp_handshake = mpack_node_map_cstr(node, "tcp_handshake");
int cmsg_array_cnt = mpack_node_array_length(tcp_handshake);
for (int i = 0; i < cmsg_array_cnt; i++) {
mpack_node_t ptr = mpack_node_array_at(tcp_handshake, i);
switch (mpack_node_type(ptr)) {
case mpack_type_uint:
value = mpack_node_u64(ptr);
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)&value, mpack_table[i].size);
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu interger msgpack cmsg: [%s] -> [%lu]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name, value);
break;
case MSGPACK_OBJECT_STR:
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)ptr->via.str.ptr, ptr->via.str.size);
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu string msgpack cmsg: [%s] -> [%s]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name, ptr->via.str.ptr);
case mpack_type_str:
tfe_cmsg_set(handler->cmsg, mpack_table[i].type, (const unsigned char *)mpack_node_str(ptr), mpack_node_strlen(ptr));
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu string msgpack cmsg: [%s] -> [%s]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name, mpack_node_str(ptr));
break;
case MSGPACK_OBJECT_NIL:
case mpack_type_nil:
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu msgpack cmsg: [%s] -> [nil]", LOG_TAG_CTRLPKT, handler->session_id, mpack_table[i].str_name);
break;
case MSGPACK_OBJECT_ARRAY:
case mpack_type_array:
switch(mpack_table[i].array_index)
{
case MPACK_ARRAY_FQDN_IDS:
@@ -195,101 +201,112 @@ static int proxy_parse_messagepack(msgpack_object *obj, void *ctx)
int parse_messagepack(const char* data, size_t length, void *ctx)
{
struct ctrl_pkt_parser *handler = (struct ctrl_pkt_parser *)ctx;
size_t off = 0;
msgpack_object *obj = NULL;
msgpack_object *ptr = NULL;
char buff[16] = {0};
mpack_node_t params;
mpack_node_t sce_map;
mpack_node_t proxy_map;
mpack_tree_t tree;
mpack_tree_init_data(&tree, data, length);
mpack_tree_parse(&tree);
mpack_node_t root = mpack_tree_root(&tree);
if (mpack_node_is_nil(root))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (invalid mpack format)", LOG_TAG_CTRLPKT);
goto error;
}
if (mpack_node_is_nil(mpack_node_map_cstr(root, "tsync")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (tsync no found)", LOG_TAG_CTRLPKT);
goto error;
}
mpack_node_copy_cstr(mpack_node_map_cstr(root, "tsync"), handler->tsync, sizeof(handler->tsync));
if (strcmp(handler->tsync, "2.0") != 0)
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (invalid tsync version) %s", LOG_TAG_CTRLPKT, handler->tsync);
goto error;
}
if (mpack_node_is_nil(mpack_node_map_cstr(root, "session_id")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (session_id no found)", LOG_TAG_CTRLPKT);
goto error;
}
handler->session_id = mpack_node_u64(mpack_node_map_cstr(root, "session_id"));
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
msgpack_unpack_return ret = msgpack_unpack_next(&unpacked, data, length, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: data[%s]", LOG_TAG_CTRLPKT, data);
goto end;
if (mpack_node_is_nil(mpack_node_map_cstr(root, "state")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (state no found)", LOG_TAG_CTRLPKT);
goto error;
}
mpack_node_copy_cstr(mpack_node_map_cstr(root, "state"), buff, sizeof(buff));
if (strncasecmp(buff, "opening", sizeof(buff)) == 0)
{
handler->state = SESSION_STATE_OPENING;
}
else if (strncasecmp(buff, "active", sizeof(buff)) == 0)
{
handler->state = SESSION_STATE_ACTIVE;
}
else if (strncasecmp(buff, "closing", sizeof(buff)) == 0)
{
handler->state = SESSION_STATE_CLOSING;
}
else if (strncasecmp(buff, "resetall", sizeof(buff)) == 0)
{
handler->state = SESSION_STATE_RESETALL;
}
else
{
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid state value) %s", LOG_TAG_CTRLPKT, buff);
}
obj = &unpacked.data;
if (obj->type != MSGPACK_OBJECT_ARRAY) {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: msgpack type is not MSGPACK_OBJECT_ARRAY", LOG_TAG_CTRLPKT);
goto end;
if (mpack_node_is_nil(mpack_node_map_cstr(root, "method")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (method no found)", LOG_TAG_CTRLPKT);
goto error;
}
mpack_node_copy_cstr(mpack_node_map_cstr(root, "method"), handler->method, sizeof(handler->method));
for (unsigned int i = 0; i < obj->via.array.size; i++) {
ptr = &obj->via.array.ptr[i];
switch (i) {
case INDEX_TSYNC:
if (ptr->type == MSGPACK_OBJECT_STR) {
memcpy(handler->tsync, ptr->via.str.ptr, ptr->via.str.size);
if (mpack_node_is_nil(mpack_node_map_cstr(root, "params")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (params no found)", LOG_TAG_CTRLPKT);
goto error;
}
params = mpack_node_map_cstr(root, "params");
if (!mpack_node_is_nil(mpack_node_map_cstr(params, "sce")))
{
sce_map = mpack_node_map_cstr(params, "sce");
if (mpack_node_is_nil(mpack_node_map_cstr(sce_map, "rule_ids")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (sce rule_ids no found)", LOG_TAG_CTRLPKT);
goto error;
}
handler->sce_policy_id_num = mpack_node_array_length(mpack_node_map_cstr(sce_map, "rule_ids"));
for (int i = 0; i < handler->sce_policy_id_num; i++) {
handler->sce_policy_ids[i] = mpack_node_u64(mpack_node_array_at(mpack_node_map_cstr(sce_map, "rule_ids"), i));
}
if (handler->sce_policy_id_num) {
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu sce policy id num: [%d]", LOG_TAG_CTRLPKT, handler->session_id, handler->sce_policy_id_num);
for (int i = 0; i < handler->sce_policy_id_num; i++) {
TFE_LOG_DEBUG(g_default_logger, "%s: session: %lu policy id:%lu ", LOG_TAG_CTRLPKT, handler->session_id, handler->sce_policy_ids[i]);
}
else {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid tsync type) %02x", LOG_TAG_CTRLPKT, ptr->type);
}
break;
case INDEX_SESSION_ID:
if (ptr->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
handler->session_id = ptr->via.u64;
}
else {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid session id type) %02x", LOG_TAG_CTRLPKT, ptr->type);
}
break;
case INDEX_STATE:
if (ptr->type == MSGPACK_OBJECT_STR) {
if (strncasecmp(ptr->via.str.ptr, "opening", ptr->via.str.size) == 0)
{
handler->state = SESSION_STATE_OPENING;
}
else if (strncasecmp(ptr->via.str.ptr, "active", ptr->via.str.size) == 0)
{
handler->state = SESSION_STATE_ACTIVE;
}
else if (strncasecmp(ptr->via.str.ptr, "closing", ptr->via.str.size) == 0)
{
handler->state = SESSION_STATE_CLOSING;
}
else if (strncasecmp(ptr->via.str.ptr, "resetall", ptr->via.str.size) == 0)
{
handler->state = SESSION_STATE_RESETALL;
}
else
{
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid state value) %s", LOG_TAG_CTRLPKT, ptr->via.str.ptr);
}
}
else {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid state type) %02x", LOG_TAG_CTRLPKT, ptr->type);
}
break;
case INDEX_METHOD:
if (ptr->type == MSGPACK_OBJECT_STR) {
memcpy(handler->method, ptr->via.str.ptr, ptr->via.str.size);
}
else {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid method type) %02x", LOG_TAG_CTRLPKT, ptr->type);
}
break;
case INDEX_VALUE_SCE:
if (ptr->type == MSGPACK_OBJECT_ARRAY) {
msgpack_object rule_id = ptr->via.array.ptr[0];
handler->sce_policy_id_num = rule_id.via.array.size;
for (uint32_t j = 0; j < rule_id.via.array.size; j++) {
handler->sce_policy_ids[j] = rule_id.via.array.ptr[j].via.u64;
}
}
break;
case INDEX_VALUE_PROXY:
if (ptr->type == MSGPACK_OBJECT_ARRAY) {
proxy_parse_messagepack(ptr, handler);
}
else {
TFE_LOG_DEBUG(g_default_logger, "%s: unexpected control packet: (invalid proxy type) %02x", LOG_TAG_CTRLPKT, ptr->type);
}
break;
default:
break;
}
}
end:
msgpack_unpacked_destroy(&unpacked);
if (mpack_node_is_nil(mpack_node_map_cstr(params, "proxy")))
{
TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (proxy no found)", LOG_TAG_CTRLPKT);
goto error;
}
proxy_map = mpack_node_map_cstr(params, "proxy");
proxy_parse_messagepack(proxy_map, handler);
mpack_tree_destroy(&tree);
return 0;
error:
mpack_tree_destroy(&tree);
return -1;
}

View File

@@ -3,15 +3,20 @@
#include <netinet/udp.h>
#include <netinet/tcp.h>
#include <netinet/ether.h>
#include <linux/if_tun.h>
#include <sys/eventfd.h>
#include <marsio.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_utils.h>
// #include <tfe_proxy.h>
#include <proxy.h>
#include "tfe_acceptor_kni.h"
#include <intercept_policy.h>
#include <unistd.h>
#include <time.h>
#include "tfe_ctrl_packet.h"
#include "tfe_raw_packet.h"
#include "io_uring.h"
@@ -21,25 +26,13 @@
#include "tfe_stream.h"
#include "raw_socket.h"
#include "packet_construct.h"
#include "tfe_tap_rss.h"
#include <intercept_policy.h>
#include "mpack.h"
#include "tap.h"
#include "bpf_obj.h"
#include "tfe_session_table.h"
#include "tfe_packet_io.h"
#include <time.h>
/*
* add: vxlan_hdr
* del: marsio_buff_ctrlzone_reset()
* +----+ NF2SF +----+
* | |--------------------------->| |
* | | | |
* | |-------+ | |-------+
* | NF | | NF2NF (undo) | SF | | SF2SF (del old vxlan_hdr; add new vxlan_hdr)
* | |<------+ | |<------+
* | | | |
* | |<---------------------------| |
* | | SF2NF | |
* +---+ del: vxlan_hdr +----+
* add: session_id + route_ctx + sid
*/
/******************************************************************************
* Struct
@@ -51,12 +44,41 @@
#define SET_TRAFFIC_IS_DECRYPTED(field) (field | TRAFFIC_IS_DECRYPTED)
#define CLEAR_TRAFFIC_IS_DECRYPTED(field) (field & ~TRAFFIC_IS_DECRYPTED)
struct config
{
int bypass_all_traffic;
int rx_burst_max;
int enable_iouring;
int enable_debuglog;
int ring_size;
int buff_size;
int flags;
int sq_thread_idle;
int bpf_debug_log;
int bpf_hash_mode;
int tap_allow_mutilthread;
char bpf_obj[1024];
char src_mac[6];
char tap_mac[6];
char tap_c_mac[6];
char tap_s_mac[6];
char dev_tap[16];
char dev_tap_c[16];
char dev_tap_s[16];
int tap_rps_enable;
char tap_rps_mask[TFE_SYMBOL_MAX];
char app_symbol[256];
char dev_nf_interface[256];
struct bpf_obj_ctx *tap_bpf_ctx;
};
struct device
@@ -141,14 +163,11 @@ extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, s
* STATIC
******************************************************************************/
static void time_echo(struct addr_tuple4 inner_addr)
static void time_echo(uint64_t session_id, char *info)
{
time_t t;
time(&t);
char *addr_string = addr_tuple4_to_str(&inner_addr);
TFE_LOG_ERROR(g_default_logger, "%s: session:%s, time:%s", LOG_TAG_PKTIO, addr_string, ctime(&t));
free(addr_string);
TFE_LOG_ERROR(g_default_logger, "%s: session:%lu, time:%s %s", LOG_TAG_PKTIO, session_id, ctime(&t), info);
}
// return 0 : not keepalive packet
@@ -173,6 +192,38 @@ static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff)
}
}
static int tfe_tap_write_per_thread(int tap_fd, const char *data, int data_len, void *logger)
{
int ret = write(tap_fd, data, data_len);
if (ret != data_len)
{
TFE_LOG_ERROR(g_default_logger, "%s: need send %dB, only send %dB, aborting: %s", LOG_TAG_PKTIO, data_len, ret, strerror(errno));
}
return ret;
}
static struct session_ctx *session_ctx_new()
{
struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx));
assert(ctx != NULL);
return ctx;
}
static void session_ctx_free(struct session_ctx *ctx)
{
if (ctx)
{
if (ctx->cmsg)
{
tfe_cmsg_destroy(ctx->cmsg);
}
free(ctx);
ctx = 0;
}
}
static void session_value_free_cb(void *ctx)
{
struct session_ctx *s_ctx = (struct session_ctx *)ctx;
@@ -659,10 +710,54 @@ static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_resto
// return -1 : error
static int packet_io_config(const char *profile, struct config *config)
{
int ret = 0;
MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&config->bypass_all_traffic, 0);
MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface));
MESA_load_profile_string_def(profile, "PACKET_IO", "tap_name", config->dev_tap, sizeof(config->dev_tap), "tap0");
MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_allow_mutilthread", &config->tap_allow_mutilthread);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "bpf_obj", config->bpf_obj, sizeof(config->bpf_obj));
MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_debug_log", (int *)&config->bpf_debug_log);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_hash_mode", (int *)&config->bpf_hash_mode);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_rps_enable", &config->tap_rps_enable);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "tap_rps_mask", config->tap_rps_mask, sizeof(config->tap_rps_mask));
MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_iouring", &config->enable_iouring);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_debuglog", &config->enable_debuglog);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "ring_size", &config->ring_size);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "buff_size", &config->buff_size);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "flags", &config->flags);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "sq_thread_idle", &config->sq_thread_idle);
MESA_load_profile_string_def(profile, "traffic_steering", "device_client", config->dev_tap_c, sizeof(config->dev_tap_c), "tap_c");
MESA_load_profile_string_def(profile, "traffic_steering", "device_server", config->dev_tap_s, sizeof(config->dev_tap_s), "tap_s");
char src_mac_addr[18] = {0};
ret = MESA_load_profile_string_nodef(profile, "PACKET_IO", "src_mac_addr", src_mac_addr, sizeof(src_mac_addr));
if(ret < 0){
TFE_LOG_ERROR(g_default_logger, "%s: invalid src_mac_addr: src_mac_addr not set, profile = %s, section = PACKET_IO", LOG_TAG_PKTIO, profile);
return -1;
}
str_to_mac(src_mac_addr, config->src_mac);
ret = get_mac_by_device_name(config->dev_tap, config->tap_mac);
if (ret != 0) {
TFE_LOG_ERROR(g_default_logger, "%s: invalid tap_name: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap);
return -1;
}
ret = get_mac_by_device_name(config->dev_tap_c, config->tap_c_mac);
if (ret != 0) {
TFE_LOG_ERROR(g_default_logger, "%s: invalid device_client: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_c);
return -1;
}
ret = get_mac_by_device_name(config->dev_tap_s, config->tap_s_mac);
if (ret != 0) {
TFE_LOG_ERROR(g_default_logger, "%s: invalid device_server: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_s);
return -1;
}
if (config->rx_burst_max > RX_BURST_MAX)
{
TFE_LOG_ERROR(g_default_logger, "%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX);
@@ -685,6 +780,22 @@ static int packet_io_config(const char *profile, struct config *config)
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_name : %s", LOG_TAG_PKTIO, config->tap_rps_mask);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_allow_mutilthread : %d", LOG_TAG_PKTIO, config->tap_allow_mutilthread);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_obj : %s", LOG_TAG_PKTIO, config->bpf_obj);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_debug_log : %d", LOG_TAG_PKTIO, config->bpf_debug_log);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->bpf_hash_mode : %d", LOG_TAG_PKTIO, config->bpf_hash_mode);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_rps_enable : %d", LOG_TAG_PKTIO, config->tap_rps_enable);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->tap_rps_mask : %s", LOG_TAG_PKTIO, config->tap_rps_mask);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->enable_iouring : %d", LOG_TAG_PKTIO, config->enable_iouring);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->enable_debuglog : %d", LOG_TAG_PKTIO, config->enable_debuglog);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->ring_size : %d", LOG_TAG_PKTIO, config->ring_size);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->buff_size : %d", LOG_TAG_PKTIO, config->buff_size);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->flags : %d", LOG_TAG_PKTIO, config->flags);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->sq_thread_idle : %d", LOG_TAG_PKTIO, config->sq_thread_idle);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->device_client : %s", LOG_TAG_PKTIO, config->dev_tap_c);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->device_server : %s", LOG_TAG_PKTIO, config->dev_tap_s);
TFE_LOG_DEBUG(g_default_logger, "%s: PACKET_IO->src_mac_addr : %s", LOG_TAG_PKTIO, src_mac_addr);
return 0;
}
@@ -826,28 +937,169 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met
/*
{
"tsync": "1.0",
"tsync": "2.0",
"session_id": "123456789",
"state": "active",
"method": "log_update",
"params": {
"sf_profile_ids": [
2,
3,
4,
5,
6,
7
]
"proxy": {
"ssl_intercept_info": {
mpack array
}
}
}
}
*/
static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
char *data;
size_t size;
mpack_writer_t writer;
mpack_writer_init_growable(&writer, &data, &size);
mpack_build_map(&writer);
mpack_write_cstr(&writer, "tsync");
mpack_write_cstr(&writer, "2.0");
mpack_write_cstr(&writer, "session_id");
mpack_write_u64(&writer, s_ctx->session_id);
mpack_write_cstr(&writer, "state");
mpack_write_cstr(&writer, "active");
mpack_write_cstr(&writer, "method");
mpack_write_cstr(&writer, "log_update");
mpack_write_cstr(&writer, "params");
mpack_build_map(&writer);
mpack_write_cstr(&writer, "proxy");
mpack_build_map(&writer);
mpack_write_cstr(&writer, "ssl_intercept_info");
mpack_build_array(&writer); // cmsg value begin
int ret = 0;
uint8_t ssl_intercept_status = 0;
uint16_t length = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl intercept state from cmsg: %s", strerror(-ret));
return;
}
uint64_t ssl_upstream_latency = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream latency from cmsg: %s", strerror(-ret));
return;
}
uint64_t ssl_downstream_latency = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream latency from cmsg: %s", strerror(-ret));
return;
}
char ssl_upstream_version[64] = {0};
uint16_t ssl_upstream_version_length = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream version from cmsg: %s", strerror(-ret));
return;
}
char ssl_downstream_version[64] = {0};
uint16_t ssl_downstream_version_length = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream version from cmsg: %s", strerror(-ret));
return;
}
uint8_t ssl_pinning_state = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl pinning state from cmsg: %s", strerror(-ret));
return;
}
uint8_t ssl_cert_verify = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl cert verify from cmsg: %s", strerror(-ret));
return;
}
char ssl_error[64] = {0};
uint16_t ssl_error_length = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl error from cmsg: %s", strerror(-ret));
return;
}
char ssl_passthrough_reason[32] = {0};
uint16_t ssl_passthrough_reason_length = 0;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl passthrough reason from cmsg: %s", strerror(-ret));
return;
}
mpack_write_u8(&writer, ssl_intercept_status);
mpack_write_u64(&writer, ssl_upstream_latency);
mpack_write_u64(&writer, ssl_downstream_latency);
mpack_write_str(&writer, ssl_upstream_version, ssl_upstream_version_length);
mpack_write_str(&writer, ssl_downstream_version, ssl_downstream_version_length);
mpack_write_u8(&writer, ssl_pinning_state);
mpack_write_u8(&writer, ssl_cert_verify);
mpack_write_str(&writer, ssl_error, ssl_error_length);
mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length);
mpack_complete_array(&writer);
mpack_complete_map(&writer);
mpack_complete_map(&writer);
mpack_complete_map(&writer);
// marsio_buff_t *tx_buffs[1];
// char *raw_packet_header_data = session_ctx->ctrl_meta->raw_data;
// int raw_packet_header_len = session_ctx->ctrl_meta->l7offset;
// marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_index);
// char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size);
// memcpy(dst, raw_packet_header_data, raw_packet_header_len);
// memcpy(dst + raw_packet_header_len, data, size);
// struct metadata meta = {0};
// meta.session_id = session_ctx->session_id;
// meta.l7offset = raw_packet_header_len;
// meta.is_ctrl_pkt = 1;
// meta.sids.num = 1;
// meta.sids.elems[0] = sce_ctx->firewall_sids;
// route_ctx_copy(&meta.route_ctx, &session_ctx->ctrl_meta->route_ctx);
// mbuff_set_metadata(tx_buffs[0], &meta);
// int nsend = marsio_buff_datalen(tx_buffs[0]);
// marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, tx_buffs, 1);
end:
mpack_writer_destroy(&writer);
free(data);
return;
}
@@ -856,12 +1108,10 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
uint8_t *iptmp = NULL;
int ret = 0;
int fd_downstream = 0;
int fd_upstream = 0;
int fd_fake_c = 0;
int fd_fake_s = 0;
uint64_t rule_id = 0;
uint16_t size = 0;
char *addr_str = NULL;
@@ -878,7 +1128,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
struct sockaddr_in *in_addr_server = (struct sockaddr_in *)&restore_info.server.addr;
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct raw_pkt_parser raw_parser;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
@@ -941,7 +1191,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
free(addr_str);
fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), thread->ref_tap_config->tap_device, 0x65);
fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), packet_io->config.dev_tap, 0x65);
if (fd_upstream < 0)
{
TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(UPSTREAM)");
@@ -949,7 +1199,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
}
// tcp repair S2C
fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), thread->ref_tap_config->tap_device, 0x65);
fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), packet_io->config.dev_tap, 0x65);
if (fd_downstream < 0)
{
TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(DOWNSTREAM)");
@@ -1080,7 +1330,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
TFE_LOG_ERROR(g_default_logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
@@ -1098,7 +1348,7 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct metadata meta;
@@ -1151,10 +1401,8 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct global_metrics *g_metrics = thread->ref_metrics;
struct addr_tuple4 inner_addr;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
@@ -1169,6 +1417,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
time_echo(meta.session_id, "raw pkg from nf start");
struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id);
if (node == NULL)
@@ -1186,6 +1435,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
{
metadata_deep_copy(s_ctx->raw_meta_e2i, &meta);
}
s_ctx->raw_meta_e2i->sids = meta.sids;
}
else
{
@@ -1193,14 +1443,15 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
{
metadata_deep_copy(s_ctx->raw_meta_i2e, &meta);
}
s_ctx->raw_meta_i2e->sids = meta.sids;
}
if (meta.is_decrypted)
{
// c2s
if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) {
add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_s_mac);
if (acceptor_ctx->config->enable_iouring) {
add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac);
if (packet_io->config.enable_iouring) {
io_uring_submit_write_entry(thread->tap_ctx->io_uring_s, raw_data, raw_len);
}
else {
@@ -1210,8 +1461,8 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
}
// s2c
else {
add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_c_mac);
if (acceptor_ctx->config->enable_iouring) {
add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac);
if (packet_io->config.enable_iouring) {
io_uring_submit_write_entry(thread->tap_ctx->io_uring_c, raw_data, raw_len);
}
else {
@@ -1229,8 +1480,8 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
buff_size = raw_len - ((char *)payload - meta->raw_data) + sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct tcphdr);
#endif
// send to tap0
add_ether_header(raw_data, acceptor_ctx->config->src_mac, acceptor_ctx->config->tap_mac);
if (acceptor_ctx->config->enable_iouring) {
add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac);
if (packet_io->config.enable_iouring) {
io_uring_submit_write_entry(thread->tap_ctx->io_uring_fd, raw_data, raw_len);
}
else {
@@ -1239,6 +1490,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len);
}
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
time_echo(meta.session_id, "raw pkg from nf end");
return 0;
}
@@ -1246,6 +1498,91 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
/******************************************************************************
* EXTERN
******************************************************************************/
int is_enable_iouring(struct packet_io *handle)
{
return handle->config.enable_iouring;
}
void tfe_tap_ctx_destory(struct tap_ctx *handler)
{
if (handler) {
io_uring_instance_destory(handler->io_uring_fd);
io_uring_instance_destory(handler->io_uring_c);
io_uring_instance_destory(handler->io_uring_s);
if (handler->eventfd > 0)
close(handler->eventfd);
if (handler->eventfd_c > 0)
close(handler->eventfd_c);
if (handler->eventfd_s > 0)
close(handler->eventfd_s);
tap_close(handler->tap_fd);
tap_close(handler->tap_c);
tap_close(handler->tap_s);
free(handler);
}
}
struct tap_ctx *tfe_tap_ctx_create(void *ctx)
{
int ret = 0;
struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread_ctx->ref_acceptor_ctx;
struct packet_io *packet_io = acceptor_ctx->io;
struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx));
assert(tap_ctx != NULL);
tap_ctx->tap_fd = tap_open(packet_io->config.dev_tap, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_ctx->tap_c = tap_open(packet_io->config.dev_tap_c, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_ctx->tap_s = tap_open(packet_io->config.dev_tap_s, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_up_link(packet_io->config.dev_tap);
tap_up_link(packet_io->config.dev_tap_c);
tap_up_link(packet_io->config.dev_tap_s);
// fcntl(2) EFD_NONBLOCK 不生效
tap_ctx->eventfd = eventfd(0, EFD_NONBLOCK);
tap_ctx->eventfd_c = eventfd(0, EFD_NONBLOCK);
tap_ctx->eventfd_s = eventfd(0, EFD_NONBLOCK);
if (packet_io->config.enable_iouring) {
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_fd);
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_c);
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_s);
tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, tap_ctx->eventfd, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_fd == NULL)
goto error_out;
tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, tap_ctx->eventfd_c, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_c == NULL)
goto error_out;
tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, tap_ctx->eventfd_s, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_s == NULL)
goto error_out;
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd, thread_ctx->thread_index);
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_c, thread_ctx->thread_index);
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_s, thread_ctx->thread_index);
}
if (packet_io->config.tap_rps_enable)
{
ret = tap_set_rps(packet_io->config.dev_tap, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
ret = tap_set_rps(packet_io->config.dev_tap_c, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
ret = tap_set_rps(packet_io->config.dev_tap_s, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
}
return tap_ctx;
error_out:
tfe_tap_ctx_destory(tap_ctx);
return NULL;
}
int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx)
{
@@ -1287,6 +1624,12 @@ void packet_io_destory(struct packet_io *handle)
handle->instance = NULL;
}
if (handle->config.tap_bpf_ctx)
{
bpf_obj_unload(handle->config.tap_bpf_ctx);
handle->config.tap_bpf_ctx = NULL;
}
free(handle);
handle = NULL;
}
@@ -1304,6 +1647,19 @@ struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_
goto error_out;
}
if (handle->config.tap_allow_mutilthread)
{
handle->config.tap_bpf_ctx = bpf_obj_load(handle->config.bpf_obj, thread_num, handle->config.bpf_hash_mode, handle->config.bpf_debug_log);
if (handle->config.tap_bpf_ctx == NULL)
{
goto error_out;
}
}
else if (thread_num > 1){
TFE_LOG_ERROR(g_default_logger, "%s: under tap mode, when disable tap_allow_mutilthread, only support one work thread.", LOG_TAG_PKTIO);
goto error_out;
}
handle->instance = marsio_create();
if (handle->instance == NULL)
{
@@ -1366,6 +1722,17 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
return 0;
}
if (handle->config.bypass_all_traffic == 1)
{
for (int i = 0; i < nr_recv; i++)
{
int raw_len = marsio_buff_datalen(rx_buffs[i]);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
@@ -1397,7 +1764,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
void handle_decryption_packet_from_tap(const char *data, int len, void *args)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct addr_tuple4 inner_addr;
@@ -1416,6 +1783,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
time_echo(s_ctx->session_id, "decryption pkg from nf start");
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index);
@@ -1435,8 +1803,9 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0);
meta.is_ctrl_pkt = 0;
meta.l7offset = 0;
meta.sids.num = 1;
meta.sids.num = 2;
meta.sids.elems[0] = acceptor_ctx->sce_sids;
meta.sids.elems[1] = acceptor_ctx->proxy_sids;
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0)
meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir;
@@ -1453,6 +1822,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
}
packet_io_set_metadata(tx_buffs[0], &meta);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1);
time_echo(s_ctx->session_id, "decryption pkg from nf end");
}
void handle_raw_packet_from_tap(const char *data, int len, void *args)
@@ -1460,7 +1830,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
char *src_mac = NULL;
char *dst_mac = NULL;
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args;
struct acceptor_ctx *acceptor_ctx = thread->ref_acceptor_ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct addr_tuple4 inner_addr;
@@ -1479,6 +1849,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
time_echo(s_ctx->session_id, "raw pkg from tap start");
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index);
@@ -1519,7 +1890,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
}
else
{
// raw_meta_i2e->raw_data maybe is null
sids_copy(&meta.sids, &s_ctx->raw_meta_i2e->sids);
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx);
}
@@ -1528,4 +1898,5 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
packet_io_set_metadata(tx_buffs[0], &meta);
add_ether_header(dst, src_mac, dst_mac);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1);
time_echo(s_ctx->session_id, "raw pkg from tap end");
}

View File

@@ -1,389 +0,0 @@
#include <fcntl.h>
#include <sys/ioctl.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdarg.h>
#include <arpa/inet.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/udp.h>
#include <linux/if_tun.h>
#include <signal.h>
#include <assert.h>
#include <MESA/MESA_prof_load.h>
#include <net/if.h>
#include <unistd.h>
#if (SUPPORT_BPF)
#include "../../bpf/bpf_conf_user.h"
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
#endif
#include "tfe_acceptor_kni.h"
#include "tfe_tap_rss.h"
#include "tfe_utils.h"
#ifndef TUN_PATH
#define TUN_PATH "/dev/net/tun"
#endif
struct bpf_ctx
{
int bpf_prog_fd;
int bpf_map_fd;
char bpf_file[1024];
#if (SUPPORT_BPF)
struct bpf_object *bpf_obj;
bpf_conf_t bpf_conf;
#endif
};
int tfe_tap_get_bpf_prog_fd(struct bpf_ctx *ctx)
{
if (ctx)
{
return ctx->bpf_prog_fd;
}
else
{
return -1;
}
}
#if (SUPPORT_BPF)
void tfe_tap_global_unload_rss_bpf(struct bpf_ctx *ctx)
{
if (ctx)
{
if (ctx->bpf_prog_fd > 0)
{
close(ctx->bpf_prog_fd);
}
if (ctx->bpf_obj)
{
bpf_object__close(ctx->bpf_obj);
ctx->bpf_obj = NULL;
}
free(ctx);
ctx = NULL;
}
}
#else
void tfe_tap_global_unload_rss_bpf(struct bpf_ctx *ctx)
{
}
#endif
/*
* bpf_queue_num : worker thread number
* bpf_default_queue : -1: for disable(only use for debug, rss to one queue)
* bpf_hash_mode : 2: hash with src/dst addr
* 4: hash with src/dst addr and src/dst port
* bpf_debug_log : 0 for disable(only use for debug, printf bpf debug log)
*/
#if (SUPPORT_BPF)
struct bpf_ctx *tfe_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t bpf_queue_num, uint32_t bpf_hash_mode, uint32_t bpf_debug_log, void *logger)
{
struct bpf_ctx *ctx = (struct bpf_ctx *)calloc(1, sizeof(struct bpf_ctx));
strncpy(ctx->bpf_file, bpf_obj_file, strlen(bpf_obj_file));
bpf_conf_set_debug_log(&ctx->bpf_conf, bpf_debug_log);
bpf_conf_set_hash_mode(&ctx->bpf_conf, bpf_hash_mode);
bpf_conf_set_queue_num(&ctx->bpf_conf, bpf_queue_num);
if (bpf_prog_load(ctx->bpf_file, BPF_PROG_TYPE_SOCKET_FILTER, &ctx->bpf_obj, &ctx->bpf_prog_fd) < 0)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to load bpf object %s, aborting: %s", ctx->bpf_file, strerror(errno));
goto error;
}
if (bpf_conf_update_map(&ctx->bpf_conf, ctx->bpf_obj) == -1)
{
goto error;
}
return ctx;
error:
tfe_tap_global_unload_rss_bpf(ctx);
return NULL;
}
#else
struct bpf_ctx *tfe_tap_global_load_rss_bpf(const char *bpf_obj_file, uint32_t bpf_queue_num, uint32_t bpf_hash_mode, uint32_t bpf_debug_log, void *logger)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "not support bpf");
return NULL;
}
#endif
struct tap_ctx *tfe_tap_ctx_create(void *ctx)
{
struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx;
struct acceptor_ctx *acceptor_ctx = thread_ctx->ref_acceptor_ctx;
struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx));
assert(tap_ctx != NULL);
tap_ctx->tap_fd = tfe_tap_open_per_thread(acceptor_ctx->config->tap_device, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE, tfe_tap_get_bpf_prog_fd(acceptor_ctx->config->tap_bpf_ctx), g_default_logger);
tap_ctx->tap_c = tfe_tap_open_per_thread(acceptor_ctx->config->tap_c_device, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE, tfe_tap_get_bpf_prog_fd(acceptor_ctx->config->tap_bpf_ctx), g_default_logger);
tap_ctx->tap_s = tfe_tap_open_per_thread(acceptor_ctx->config->tap_s_device, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE, tfe_tap_get_bpf_prog_fd(acceptor_ctx->config->tap_bpf_ctx), g_default_logger);
return tap_ctx;
}
struct tap_config *tfe_tap_config_create(const char *profile, int thread_num)
{
int ret = 0;
int tap_allow_mutilthread = 0;
uint32_t bpf_debug_log = 0;
uint32_t bpf_hash_mode = 2;
uint32_t bpf_queue_num = thread_num;
char bpf_obj[1024] = {0};
struct tap_config *tap = (struct tap_config *)calloc(1, sizeof(struct tap_config));
assert(tap != NULL);
MESA_load_profile_int_nodef(profile, "tap", "tap_rps_enable", &tap->tap_rps_enable);
MESA_load_profile_string_def(profile, "tap", "tap_name", tap->tap_device, sizeof(tap->tap_device), "tap0");
MESA_load_profile_string_def(profile, "traffic_steering", "device_client", tap->tap_c_device, sizeof(tap->tap_c_device), "tap_c");
MESA_load_profile_string_def(profile, "traffic_steering", "device_server", tap->tap_s_device, sizeof(tap->tap_s_device), "tap_s");
MESA_load_profile_int_nodef(profile, "tap", "bpf_debug_log", (int *)&bpf_debug_log);
MESA_load_profile_int_nodef(profile, "tap", "bpf_hash_mode", (int *)&bpf_hash_mode);
MESA_load_profile_string_nodef(profile, "tap", "bpf_obj", bpf_obj, sizeof(bpf_obj));
MESA_load_profile_int_nodef(profile, "tap", "tap_allow_mutilthread", &tap_allow_mutilthread);
MESA_load_profile_int_nodef(profile, "io_uring", "enable_iouring", &tap->enable_iouring);
MESA_load_profile_int_nodef(profile, "io_uring", "enable_debuglog", &tap->enable_debuglog);
MESA_load_profile_int_nodef(profile, "io_uring", "ring_size", &tap->ring_size);
MESA_load_profile_int_nodef(profile, "io_uring", "buff_size", &tap->buff_size);
MESA_load_profile_int_nodef(profile, "io_uring", "flags", &tap->flags);
MESA_load_profile_int_nodef(profile, "io_uring", "sq_thread_idle", &tap->sq_thread_idle);
char src_mac_addr_str[TFE_SYMBOL_MAX];
ret = MESA_load_profile_string_nodef(profile, "system", "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str));
if(ret < 0){
TFE_LOG_ERROR(g_default_logger, "MESA_prof_load: src_mac_addr not set, profile = %s, section = system", profile);
goto error_out;
}
str_to_mac(src_mac_addr_str, tap->src_mac);
get_mac_by_device_name(tap->tap_device, tap->tap_mac);
get_mac_by_device_name(tap->tap_c_device, tap->tap_c_mac);
get_mac_by_device_name(tap->tap_s_device, tap->tap_s_mac);
if (tap->tap_rps_enable)
{
if (MESA_load_profile_string_nodef(profile, "tap", "tap_rps_mask", tap->tap_rps_mask, sizeof(tap->tap_rps_mask)) < 0)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "under tap mode, when enable tap_rps_enable, tap_rps_mask is required.");
goto error_out;
}
}
if (tap_allow_mutilthread)
{
tap->tap_bpf_ctx = tfe_tap_global_load_rss_bpf(bpf_obj, bpf_queue_num, bpf_hash_mode, bpf_debug_log, g_default_logger);
if (tap->tap_bpf_ctx == NULL)
{
goto error_out;
}
}
else if (thread_num > 1){
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "under tap mode, when disable tap_allow_mutilthread, only support one work thread.");
goto error_out;
}
return tap;
error_out:
tfe_tap_destory(tap);
return NULL;
}
void tfe_tap_destory(struct tap_config *tap)
{
if (tap)
{
if (tap->tap_bpf_ctx)
{
tfe_tap_global_unload_rss_bpf(tap->tap_bpf_ctx);
tap->tap_bpf_ctx = NULL;
}
free(tap);
tap = NULL;
}
}
int tfe_tap_set_rps(void *local_logger, const char *tap_name, int thread_num, const char *rps_mask)
{
char file[1024] = {0};
memset(file, 0, sizeof(file));
snprintf(file, sizeof(file), "/sys/class/net/%s/queues/rx-%d/rps_cpus", tap_name, thread_num);
FILE *fp = fopen(file, "w");
if (fp == NULL)
{
TFE_LOG_ERROR(local_logger, "%s can't open %s, %s", TAP_RSS_LOG_TAG, file, strerror(errno));
return -1;
}
fwrite(rps_mask, strlen(rps_mask), 1, fp);
TFE_LOG_DEBUG(local_logger, TAP_RSS_LOG_TAG "set rps '%s' to %s", rps_mask, file);
fclose(fp);
return 0;
}
int tfe_tap_open_per_thread(const char *tap_dev, int tap_flags, int bpf_prog_fd, void *logger)
{
int fd = -1;
int tap_fd = -1;
int nonblock_flags = -1;
struct ifreq ifr;
tap_fd = open(TUN_PATH, O_RDWR);
if (tap_fd == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to open " TUN_PATH ", aborting: %s", strerror(errno));
return -1;
}
memset(&ifr, 0, sizeof(ifr));
ifr.ifr_flags = tap_flags;
strcpy(ifr.ifr_name, tap_dev);
if (ioctl(tap_fd, TUNSETIFF, &ifr) == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to attach %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
/*
* The TUNSETPERSIST ioctl can be used to make the TUN/TAP interface persistent.
* In this mode, the interface won't be destroyed when the last process closes the associated /dev/net/tun file descriptor.
*/
/*
if (ioctl(tap_fd, TUNSETPERSIST, 1) == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to set persist on %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
*/
#if (SUPPORT_BPF)
if (bpf_prog_fd > 0)
{
// Set bpf
if (ioctl(tap_fd, TUNSETSTEERINGEBPF, (void *)&bpf_prog_fd) == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to set bpf on %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
}
#endif
// Set nonblock
nonblock_flags = fcntl(tap_fd, F_GETFL);
if (nonblock_flags == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to get nonblock flags on %s fd, aborting: %s", tap_dev, strerror(errno));
goto error;
}
nonblock_flags |= O_NONBLOCK;
if (fcntl(tap_fd, F_SETFL, nonblock_flags) == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to set nonblock flags on %s fd, aborting: %s", tap_dev, strerror(errno));
goto error;
}
// Get MTU
fd = socket(PF_INET, SOCK_DGRAM, 0);
if (fd == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to create socket, aborting: %s", strerror(errno));
goto error;
}
memset(&ifr, 0, sizeof(ifr));
strcpy(ifr.ifr_name, tap_dev);
if (ioctl(fd, SIOCGIFMTU, &ifr) < 0)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to get MTU on %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
// Set eth up
if (ioctl(fd, SIOCGIFFLAGS, &ifr) == -1)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to get link status on %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
if ((ifr.ifr_flags & IFF_UP) == 0)
{
ifr.ifr_flags |= IFF_UP;
if (ioctl(fd, SIOCSIFFLAGS, &ifr) < 0)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to set link status on %s, aborting: %s", tap_dev, strerror(errno));
goto error;
}
}
TFE_LOG_INFO(logger, TAP_RSS_LOG_TAG "using tap device %s with MTU %d", tap_dev, ifr.ifr_mtu);
close(fd);
return tap_fd;
error:
if (fd > 0)
{
close(fd);
fd = -1;
}
if (tap_fd > 0)
{
close(tap_fd);
tap_fd = -1;
}
return -1;
}
void tfe_tap_close_per_thread(int tap_fd)
{
if (tap_fd > 0)
{
close(tap_fd);
}
}
int tfe_tap_read_per_thread(int tap_fd, char *buff, int buff_size, void *logger)
{
int ret = read(tap_fd, buff, buff_size);
if (ret < 0)
{
if (errno != EWOULDBLOCK && errno != EAGAIN)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "unable to read data from tapfd %d, aborting: %s", tap_fd, strerror(errno));
}
}
return ret;
}
int tfe_tap_write_per_thread(int tap_fd, const char *data, int data_len, void *logger)
{
int ret = write(tap_fd, data, data_len);
if (ret != data_len)
{
TFE_LOG_ERROR(g_default_logger, TAP_RSS_LOG_TAG "need send %dB, only send %dB, aborting: %s", data_len, ret, strerror(errno));
}
return ret;
}

View File

@@ -1,65 +0,0 @@
#include <time.h>
#include <stdlib.h>
#include <tfe_utils.h>
#include "tfe_timestamp.h"
// 1 s = 1000 ms
// 1 ms = 1000 us
// 1 us = 1000 ns
struct timestamp
{
struct timespec timestamp;
uint64_t update_interval_ms;
};
struct timestamp *timestamp_new(uint64_t update_interval_ms)
{
struct timestamp *ts = (struct timestamp *)calloc(1, sizeof(struct timestamp));
ts->update_interval_ms = update_interval_ms;
timestamp_update(ts);
TFE_LOG_DEBUG(g_default_logger, "%s: TIMESTAMP->update_interval_ms : %lu", LOG_TAG_TIMESTAMP, timestamp_update_interval_ms(ts));
TFE_LOG_DEBUG(g_default_logger, "%s: TIMESTAMP->current_sec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_sec(ts));
TFE_LOG_DEBUG(g_default_logger, "%s: TIMESTAMP->current_msec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_msec(ts));
return ts;
}
void timestamp_free(struct timestamp *ts)
{
if (ts)
{
free(ts);
ts = NULL;
}
}
void timestamp_update(struct timestamp *ts)
{
struct timespec temp;
clock_gettime(CLOCK_MONOTONIC, &temp);
ATOMIC_SET(&(ts->timestamp.tv_sec), temp.tv_sec);
ATOMIC_SET(&(ts->timestamp.tv_nsec), temp.tv_nsec);
}
uint64_t timestamp_update_interval_ms(struct timestamp *ts)
{
return ts->update_interval_ms;
}
uint64_t timestamp_get_sec(struct timestamp *ts)
{
uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec));
return sec;
}
uint64_t timestamp_get_msec(struct timestamp *ts)
{
uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec));
uint64_t nsec = ATOMIC_READ(&(ts->timestamp.tv_nsec));
return sec * 1000 + nsec / 1000000;
}

View File

@@ -4,8 +4,6 @@ enable_kni_v1=0
enable_kni_v2=0
enable_kni_v3=0
enable_kni_v4=1
firewall_sids=1001
service_chaining_sids=1002
# Only when (disable_coredump == 1 || (enable_breakpad == 1 && enable_breakpad_upload == 1)) is satisfied, the core will not be generated locally
disable_coredump=0
@@ -23,7 +21,6 @@ cpu_affinity_mask=1-9
# LEAST_CONN = 0; ROUND_ROBIN = 1
load_balance=1
src_mac_addr = 00:0e:c6:d6:72:c1
# for enable kni v3
[nfq]
@@ -179,8 +176,8 @@ enable_steering_ssl=1
so_mask_client=17
# 34: 0x22
so_mask_server=34
device_client=eth_client
device_server=eth_server
device_client=tap_c
device_server=tap_s
http_keepalive_enable=1
http_keepalive_path="/metrics"
@@ -230,20 +227,29 @@ app_name="proxy_rule_hits"
# for enable kni v4
[packet_io]
packet_io_threads=8
packet_io_cpu_affinity_mask=1-9
firewall_sids=1000
proxy_sids=1001
service_chaining_sids=1002
# bypass_all_traffic:1 NF2NF and SF2SF
bypass_all_traffic=0
rx_burst_max=128
app_symbol=sce
app_symbol=tfe
dev_nf_interface=eth_nf_interface
[tap]
src_mac_addr = 00:0e:c6:d6:72:c1
# tap config
tap_name=tap0
# 1.tap_allow_mutilthread=1 load bpf rss obj
# 2.tap_allow_mutilthread=0 not load bpf rss obj
tap_allow_mutilthread=1
bpf_obj=/opt/tsg/sapp/plug/business/kni/bpf_tun_rss_steering.o
bpf_default_queue=-1
bpf_obj=/opt/tsg/tfe/resource/bpf/bpf_tun_rss_steering.o
# tap_bpf_debug_log: cat /sys/kernel/debug/tracing/trace_pipe
bpf_debug_log=0
# 2: BPF 使用二元组分流
@@ -254,7 +260,7 @@ bpf_hash_mode=2
tap_rps_enable=1
tap_rps_mask=0,1fffffff,c0000000,00000000
[io_uring]
# iouring config
enable_iouring=1
enable_debuglog=0
ring_size=1024

View File

@@ -27,7 +27,6 @@ target_link_libraries(tfe pthread dl nfnetlink
MESA_field_stat
fieldstat3
breakpad_mini
msgpack
${SYSTEMD_LIBRARIES})
if(ENABLE_PLUGIN_HTTP)

View File

@@ -1,13 +1,6 @@
#pragma once
struct tfe_proxy;
struct acceptor_kni_v4
{
struct tfe_proxy *proxy;
const char *profile;
struct acceptor_ctx *acceptor;
};
struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const char *profile, void *logger);
void acceptor_kni_v4_destroy();

View File

@@ -6,22 +6,89 @@
#include <linux/netfilter.h> // for NF_ACCEPT
#include <libnetfilter_queue/libnetfilter_queue.h>
#include <linux/if_tun.h>
#include <MESA/MESA_prof_load.h>
#include <bpf_obj.h>
#include <tfe_utils.h>
#include <tfe_cmsg.h>
#include <proxy.h>
#include <tfe_acceptor_kni.h>
#include "io_uring.h"
#include "tfe_tap_rss.h"
#include "tfe_metrics.h"
#include "tfe_tcp_restore.h"
#include "acceptor_kni_v4.h"
#include "tap.h"
#include "tfe_packet_io.h"
#include "tfe_session_table.h"
static int tfe_tap_read_per_thread(int tap_fd, char *buff, int buff_size, void *logger)
{
int ret = read(tap_fd, buff, buff_size);
if (ret < 0)
{
if (errno != EWOULDBLOCK && errno != EAGAIN)
{
TFE_LOG_ERROR(g_default_logger, "%s: unable to read data from tapfd %d, aborting: %s", LOG_TAG_PKTIO, tap_fd, strerror(errno));
}
}
return ret;
}
void acceptor_ctx_destory(struct acceptor_kni_v4 * ctx)
{
if (ctx)
{
packet_io_destory(ctx->io);
global_metrics_destory(ctx->metrics);
free(ctx);
ctx = NULL;
}
return;
}
struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile)
{
struct acceptor_kni_v4 *ctx = ALLOC(struct acceptor_kni_v4, 1);
MESA_load_profile_int_def(profile, "PACKET_IO", "firewall_sids", (int *)&(ctx->firewall_sids), 1000);
MESA_load_profile_int_def(profile, "PACKET_IO", "proxy_sids", (int *)&(ctx->proxy_sids), 1001);
MESA_load_profile_int_def(profile, "PACKET_IO", "service_chaining_sids", (int *)&(ctx->sce_sids), 1002);
MESA_load_profile_int_def(profile, "PACKET_IO", "packet_io_threads", (int *)&(ctx->nr_worker_threads), 8);
MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", TFE_THREAD_MAX, (unsigned int *)ctx->cpu_affinity_mask);
ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, TFE_THREAD_MAX);
CPU_ZERO(&ctx->coremask);
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
int cpu_id = ctx->cpu_affinity_mask[i];
CPU_SET(cpu_id, &ctx->coremask);
}
ctx->io = packet_io_create(profile, ctx->nr_worker_threads, &ctx->coremask);
if (ctx->io == NULL)
{
goto error_out;
}
ctx->metrics = global_metrics_create();
if (ctx->metrics == NULL)
{
goto error_out;
}
return ctx;
error_out:
acceptor_ctx_destory(ctx);
return NULL;
}
static void *worker_thread_cycle(void *arg)
{
struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)arg;
struct packet_io *handle = thread_ctx->ref_io;
struct acceptor_ctx *acceptor_ctx = thread_ctx->ref_acceptor_ctx;
int pkg_len = 0;
char thread_name[16];
@@ -38,7 +105,7 @@ static void *worker_thread_cycle(void *arg)
goto error_out;
}
if (acceptor_ctx->config->enable_iouring) {
if (is_enable_iouring(handle)) {
io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_fd, handle_raw_packet_from_tap, thread_ctx);
io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_c, handle_decryption_packet_from_tap, thread_ctx);
io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_s, handle_decryption_packet_from_tap, thread_ctx);
@@ -52,10 +119,10 @@ static void *worker_thread_cycle(void *arg)
while(1) {
n_pkt_recv_from_nf = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx);
if (acceptor_ctx->config->enable_iouring) {
if (is_enable_iouring(handle)) {
n_pkt_recv_from_tap = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_fd);
n_pkt_recv_from_tap_c = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_c);
n_pkt_recv_from_tap_c = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_s);
n_pkt_recv_from_tap_s = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_s);
}
else {
if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_fd, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0)
@@ -63,22 +130,22 @@ static void *worker_thread_cycle(void *arg)
handle_raw_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx);
}
// if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_c, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0)
// {
// handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx);
// }
if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_c, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0)
{
handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx);
}
// if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_s, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0)
// {
// handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx);
// }
if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_s, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0)
{
handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx);
}
}
// if (n_pkt_recv_from_nf == 0)
// {
// packet_io_thread_wait(handle, thread_ctx, 0);
// }
if (n_pkt_recv_from_nf == 0 && n_pkt_recv_from_tap == 0 && n_pkt_recv_from_tap_c == 0 && n_pkt_recv_from_tap_s == 0)
{
packet_io_thread_wait(handle, thread_ctx, -1);
}
if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0)
{
@@ -92,17 +159,21 @@ error_out:
return (void *)NULL;
}
void acceptor_kni_v4_destroy()
void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
{
if (ctx)
{
packet_io_destory(ctx->io);
free(ctx);
ctx = NULL;
}
return;
}
struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const char *profile, void *logger)
{
int ret = 0;
struct acceptor_kni_v4 *__ctx = (struct acceptor_kni_v4 *)calloc(1, sizeof(struct acceptor_kni_v4));
struct acceptor_ctx *acceptor_ctx = acceptor_ctx_create(profile);
struct acceptor_kni_v4 *acceptor_ctx = acceptor_ctx_create(profile);
if (acceptor_ctx == NULL)
goto error_out;
@@ -113,27 +184,15 @@ struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const ch
acceptor_ctx->work_threads[i].ref_acceptor_ctx = acceptor_ctx;
acceptor_ctx->work_threads[i].tap_ctx = tfe_tap_ctx_create(&acceptor_ctx->work_threads[i]);
if (acceptor_ctx->config->enable_iouring) {
int eventfd = 0;
struct tap_ctx *tap_ctx = acceptor_ctx->work_threads[i].tap_ctx;
tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog);
tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog);
tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog);
}
if (acceptor_ctx->work_threads[i].tap_ctx == NULL)
goto error_out;
acceptor_ctx->work_threads[i].session_table = session_table_create();
acceptor_ctx->work_threads[i].ref_io = acceptor_ctx->io;
acceptor_ctx->work_threads[i].ref_proxy = proxy;
acceptor_ctx->work_threads[i].ref_tap_config = acceptor_ctx->config;
acceptor_ctx->work_threads[i].ref_metrics = acceptor_ctx->metrics;
acceptor_ctx->work_threads[i].ref_acceptor_ctx = acceptor_ctx;
acceptor_ctx->work_threads[i].session_table_need_reset = 0;
if (acceptor_ctx->config->tap_rps_enable)
{
ret = tfe_tap_set_rps(g_default_logger, acceptor_ctx->config->tap_device, i, acceptor_ctx->config->tap_rps_mask);
if (ret != 0)
goto error_out;
}
}
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) {
@@ -144,9 +203,14 @@ struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const ch
}
}
return __ctx;
return acceptor_ctx;
error_out:
acceptor_kni_v4_destroy();
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) {
tfe_tap_ctx_destory(acceptor_ctx->work_threads[i].tap_ctx);
session_table_destory(acceptor_ctx->work_threads[i].session_table);
}
acceptor_kni_v4_destroy(acceptor_ctx);
return NULL;
}

View File

@@ -60,7 +60,6 @@
/* Systemd */
#include <systemd/sd-daemon.h>
#include "tfe_acceptor_kni.h"
extern struct tcp_policy_enforcer *tcp_policy_enforcer_create(void *logger);
extern struct chaining_policy_enforcer *chaining_policy_enforcer_create(void *logger);

16
vendor/CMakeLists.txt vendored
View File

@@ -375,19 +375,3 @@ set_property(TARGET libnetfilter_queue-static PROPERTY INTERFACE_INCLUDE_DIRECTO
#add_dependencies(gperftools-static gperftools)
#set_property(TARGET gperftools-static PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libtcmalloc.a)
#set_property(TARGET gperftools-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include)
### msgpack-c 6.0.0
ExternalProject_Add(msgpack-c PREFIX msgpack-c
URL ${CMAKE_CURRENT_SOURCE_DIR}/msgpack-c-6.0.0.tar.gz
URL_MD5 adc08f48550ce772fe24c0b41166b0de
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=<INSTALL_DIR>
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DMSGPACK_BUILD_TESTS=OFF)
ExternalProject_Get_Property(msgpack-c INSTALL_DIR)
file(MAKE_DIRECTORY ${INSTALL_DIR}/include)
add_library(msgpack STATIC IMPORTED GLOBAL)
add_dependencies(msgpack msgpack-c)
set_property(TARGET msgpack PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib/libmsgpack-c.a)
set_property(TARGET msgpack PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include)

Binary file not shown.