This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/tfe_packet_io.cpp

2084 lines
80 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <assert.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/ether.h>
#include <linux/if_tun.h>
#include <sys/eventfd.h>
#include <marsio.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_utils.h>
#include <proxy.h>
#include <intercept_policy.h>
#include <unistd.h>
#include <time.h>
#include "tfe_ctrl_packet.h"
#include "packet.h"
#include "ipv4_helpers.h"
#include "tcp_helpers.h"
#include "io_uring.h"
#include "tfe_packet_io_fs.h"
#include "tfe_cmsg.h"
#include "tfe_tcp_restore.h"
#include "tfe_stream.h"
#include "packet_construct.h"
#include "mpack.h"
#include "tap.h"
#include "bpf_obj.h"
#include "tfe_session_table.h"
#include "tfe_packet_io.h"
#include "tfe_fieldstat.h"
#include "dablooms.h"
#include "timestamp.h"
/******************************************************************************
* Struct
******************************************************************************/
#define RX_BURST_MAX 128
#define CURDIR_C2S 0x1
#define CURDIR_S2C 0x2
#define IS_SINGLE 0x01
#define IS_TUNNEL 0x02
#define TRAFFIC_IS_DECRYPTED (1 << 0)
#define SET_TRAFFIC_IS_DECRYPTED(field) (field | TRAFFIC_IS_DECRYPTED)
#define CLEAR_TRAFFIC_IS_DECRYPTED(field) (field & ~TRAFFIC_IS_DECRYPTED)
struct config
{
int bypass_all_traffic;
int rx_burst_max;
int enable_iouring;
int enable_debuglog;
int ring_size;
int buff_size;
int flags;
int sq_thread_idle;
int bpf_debug_log;
int bpf_hash_mode;
int tap_allow_mutilthread;
char bpf_obj[1024];
char src_mac[6];
char tap_mac[6];
char tap_c_mac[6];
char tap_s_mac[6];
char dev_tap[16];
char dev_tap_c[16];
char dev_tap_s[16];
int tap_rps_enable;
char tap_rps_mask[TFE_SYMBOL_MAX];
char app_symbol[256];
char dev_nf_interface[256];
struct bpf_obj_ctx *tap_bpf_ctx;
};
struct device
{
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
};
struct packet_io
{
int thread_num;
struct mr_instance *instance;
struct device dev_nf_interface;
struct config config;
};
enum raw_pkt_action
{
RAW_PKT_ERR_BYPASS,
RAW_PKT_HIT_BYPASS,
RAW_PKT_HIT_BLOCK,
RAW_PKT_HIT_STEERING,
RAW_PKT_HIT_MIRRORING,
};
enum inject_pkt_action
{
INJT_PKT_ERR_DROP,
INJT_PKT_MIRR_RX_DROP,
INJT_PKT_HIT_BLOCK,
INJT_PKT_HIT_FWD2SF, // forward to service function
INJT_PKT_HIT_FWD2NF, // forward to network function
};
struct metadata
{
int write_ref;
uint64_t session_id;
char *raw_data;
int raw_len;
int is_e2i_dir;
int is_ctrl_pkt;
uint16_t l7offset; // only control packet set l7offset
uint16_t is_decrypted;
struct sids sids;
struct route_ctx route_ctx;
};
struct packet_identify
{
// TCP
uint32_t tcp_seq;
uint32_t tcp_ack;
uint16_t sport;
uint16_t dport;
uint16_t tcp_checksum;
// IPv4
uint16_t ip_id;
uint32_t ip_src;
uint32_t ip_dst;
} __attribute__((__packed__));
extern int tcp_policy_enforce(struct tcp_policy_enforcer *tcp_enforcer, struct tfe_cmsg *cmsg);
extern int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg * cmsg);
extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, struct tfe_cmsg *cmsg, uint64_t rule_id);
/******************************************************************************
* dup packet filter
******************************************************************************/
// return 0: success
// reutrn -1: error
static int get_packet_identify(struct packet *packet, struct packet_identify *key)
{
const struct layer_record *l3_layer_record = packet_get_innermost_layer(packet, LAYER_TYPE_IPV4);
if (l3_layer_record == NULL)
{
return -1;
}
const struct layer_record *l4_layer_record = packet_get_innermost_layer(packet, LAYER_TYPE_TCP);
if (l4_layer_record == NULL)
{
return -1;
}
const struct ip *iphdr = (const struct ip *)l3_layer_record->hdr_ptr;
const struct tcphdr *tcphdr = (const struct tcphdr *)l4_layer_record->hdr_ptr;
memset(key, 0, sizeof(struct packet_identify));
key->tcp_seq = tcp_hdr_get_seq(tcphdr);
key->tcp_ack = tcp_hdr_get_ack(tcphdr);
key->sport = tcp_hdr_get_sport(tcphdr);
key->dport = tcp_hdr_get_dport(tcphdr);
key->tcp_checksum = tcp_hdr_get_checksum(tcphdr);
key->ip_id = ipv4_hdr_get_ipid(iphdr);
key->ip_src = ipv4_hdr_get_src(iphdr);
key->ip_dst = ipv4_hdr_get_dst(iphdr);
return 0;
}
static void add_packet_to_dablooms(struct packet *packet, struct expiry_dablooms_handle *handle)
{
struct packet_identify identify;
if (get_packet_identify(packet, &identify) == -1)
{
return;
}
expiry_dablooms_add(handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec());
}
// return 1: hit
// reutrn 0: no hit
static int search_packet_from_dablooms(struct packet *packet, struct expiry_dablooms_handle *handle)
{
struct packet_identify identify;
if (get_packet_identify(packet, &identify) == -1)
{
return 0;
}
if (expiry_dablooms_search(handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec()) == 1)
{
return 1;
}
return 0;
}
/******************************************************************************
* STATIC
******************************************************************************/
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto == 0xAAAA)
{
return 1;
}
else
{
return 0;
}
}
static int tap_write(int tap_fd, const char *data, int data_len, void *logger)
{
int ret = write(tap_fd, data, data_len);
if (ret != data_len)
{
TFE_LOG_ERROR(logger, "%s: need send %dB, only send %dB, aborting: %s", LOG_TAG_PKTIO, data_len, ret, strerror(errno));
}
return ret;
}
static struct metadata *metadata_new()
{
struct metadata *meta = (struct metadata *)calloc(1, sizeof(struct metadata));
return meta;
}
static void metadata_deep_copy(struct metadata *dst, struct metadata *src)
{
dst->write_ref++;
dst->session_id = src->session_id;
dst->raw_data = (char *)calloc(src->raw_len, sizeof(char));
memcpy(dst->raw_data, src->raw_data, src->raw_len);
dst->raw_len = src->raw_len;
dst->l7offset = src->l7offset;
dst->is_e2i_dir = src->is_e2i_dir;
dst->is_ctrl_pkt = src->is_ctrl_pkt;
dst->is_decrypted = src->is_decrypted;
}
void metadata_free(struct metadata *meta)
{
if (meta)
{
if (meta->raw_data)
{
free(meta->raw_data);
meta->raw_data = NULL;
}
free(meta);
meta = NULL;
}
}
static struct session_ctx *session_ctx_new()
{
struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx));
assert(ctx != NULL);
return ctx;
}
static void session_ctx_free(struct session_ctx *ctx)
{
if (ctx)
{
if (ctx->cmsg)
{
tfe_cmsg_destroy(&ctx->cmsg);
}
if (ctx->ctrl_meta)
{
metadata_free(ctx->ctrl_meta);
ctx->ctrl_meta = NULL;
}
if (ctx->c2s_info.header_data) {
free(ctx->c2s_info.header_data);
ctx->c2s_info.header_data = NULL;
}
if (ctx->s2c_info.header_data) {
free(ctx->s2c_info.header_data);
ctx->s2c_info.header_data = NULL;
}
free(ctx);
ctx = NULL;
}
}
static void session_value_free_cb(void *ctx)
{
struct session_ctx *s_ctx = (struct session_ctx *)ctx;
session_ctx_free(s_ctx);
}
static int add_ether_header(void *raw_data, char *src_mac, char *dst_mac){
struct ethhdr *ether_hdr = (struct ethhdr*)raw_data;
memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest));
memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source));
return 0;
}
static int add_ether_proto(void *raw_data, uint16_t proto){
struct ethhdr *ether_hdr = (struct ethhdr*)raw_data;
ether_hdr->h_proto = htons(proto); // ETH_P_IP
return 0;
}
static int overwrite_tcp_mss(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore, uint64_t session_id, void *logger)
{
int ret = 0;
uint16_t size = 0;
int server_side_mss_enable = 0;
int server_side_mss_value = 0;
int client_side_mss_enable = 0;
int client_side_mss_value = 0;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DOWNSTREAM_TCP_MSS_ENABLE, (unsigned char *)&client_side_mss_enable, sizeof(client_side_mss_enable), &size);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch client side tcp mss from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret));
return -1;
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DOWNSTREAM_TCP_MSS_VALUE, (unsigned char *)&client_side_mss_value, sizeof(client_side_mss_value), &size);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch client side tcp mss value from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret));
return -1;
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_UPSTREAM_TCP_MSS_ENABLE, (unsigned char *)&server_side_mss_enable, sizeof(server_side_mss_enable), &size);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch server side tcp mss from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret));
return -1;
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_UPSTREAM_TCP_MSS_VALUE, (unsigned char *)&server_side_mss_value, sizeof(server_side_mss_value), &size);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch server side tcp mss value from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret));
return -1;
}
if (client_side_mss_enable)
{
restore->client.mss = client_side_mss_value;
}
if (server_side_mss_enable)
{
restore->server.mss = server_side_mss_value;
}
return 0;
}
static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore_info)
{
int ret = 0;
uint16_t length = 0;
uint32_t seq;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (unsigned char *)&seq, sizeof(uint32_t), &length);
if (ret == 0)
{
restore_info->client.seq = ntohl(seq);
restore_info->server.ack = ntohl(seq);
}
uint32_t ack;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (unsigned char *)&ack, sizeof(uint32_t), &length);
if (ret == 0)
{
restore_info->client.ack = ntohl(ack);
restore_info->server.seq = ntohl(ack);
}
uint8_t ts_client;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (unsigned char *)&ts_client, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->client.timestamp_perm = !!ts_client;
}
uint8_t ts_server;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (unsigned char *)&ts_server, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->server.timestamp_perm = !!ts_server;
}
uint32_t ts_client_val;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL, (unsigned char *)&ts_client_val, sizeof(uint32_t), &length);
if (ret == 0)
{
restore_info->client.ts_val = ntohl(ts_client_val);
}
uint32_t ts_server_val;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL, (unsigned char *)&ts_server_val, sizeof(uint32_t), &length);
if (ret == 0)
{
restore_info->server.ts_val = ntohl(ts_server_val);
}
uint8_t wsacle_client;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (unsigned char *)&wsacle_client, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->client.wscale_perm = true;
restore_info->client.wscale = wsacle_client;
}
uint8_t wsacle_server;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (unsigned char *)&wsacle_server, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->server.wscale_perm = true;
restore_info->server.wscale = wsacle_server;
}
uint8_t sack_client;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (unsigned char *)&sack_client, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->client.sack_perm = !!sack_client;
}
uint8_t sack_server;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (unsigned char *)&sack_server, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->server.sack_perm = !!sack_server;
}
uint16_t mss_client;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (unsigned char *)&mss_client, sizeof(uint16_t), &length);
if (ret == 0)
{
restore_info->client.mss = mss_client;
}
uint16_t mss_server;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (unsigned char *)&mss_server, sizeof(uint16_t), &length);
if (ret == 0)
{
restore_info->server.mss = mss_server;
}
uint16_t window_client;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (unsigned char *)&window_client, sizeof(uint16_t), &length);
if (ret == 0)
{
restore_info->client.window = window_client;
}
uint16_t window_server;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (unsigned char *)&window_server, sizeof(uint16_t), &length);
if (ret == 0)
{
restore_info->server.window = window_server;
}
uint8_t packet_cur_dir;
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR, (unsigned char *)&packet_cur_dir, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->cur_dir = (enum tcp_restore_pkt_dir)packet_cur_dir;
}
return 0;
}
static int tcp_restore_set_from_pkg(struct tuple4 *tuple4, struct tcp_restore_info *restore_info)
{
if (tuple4->ip_type == IP_TYPE_V4)
{
struct sockaddr_in *in_addr_client;
struct sockaddr_in *in_addr_server;
if (restore_info->cur_dir == PKT_DIR_NOT_SET || restore_info->cur_dir == PKT_DIR_C2S)
{
in_addr_client = (struct sockaddr_in *)&restore_info->client.addr;
in_addr_server = (struct sockaddr_in *)&restore_info->server.addr;
}
else
{
in_addr_client = (struct sockaddr_in *)&restore_info->server.addr;
in_addr_server = (struct sockaddr_in *)&restore_info->client.addr;
}
in_addr_client->sin_family = AF_INET;
in_addr_client->sin_addr = tuple4->src_addr.v4;
in_addr_client->sin_port = tuple4->src_port;
in_addr_server->sin_family = AF_INET;
in_addr_server->sin_addr = tuple4->dst_addr.v4;
in_addr_server->sin_port = tuple4->dst_port;
}
if (tuple4->ip_type == IP_TYPE_V6)
{
struct sockaddr_in6 *in6_addr_client;
struct sockaddr_in6 *in6_addr_server;
if (restore_info->cur_dir == PKT_DIR_NOT_SET || restore_info->cur_dir == PKT_DIR_C2S)
{
in6_addr_client = (struct sockaddr_in6 *)&restore_info->client.addr;
in6_addr_server = (struct sockaddr_in6 *)&restore_info->server.addr;
}
else
{
in6_addr_client = (struct sockaddr_in6 *)&restore_info->server.addr;
in6_addr_server = (struct sockaddr_in6 *)&restore_info->client.addr;
}
in6_addr_client->sin6_family = AF_INET6;
in6_addr_client->sin6_addr = tuple4->src_addr.v6;
in6_addr_client->sin6_port = tuple4->src_port;
in6_addr_server->sin6_family = AF_INET6;
in6_addr_server->sin6_addr = tuple4->dst_addr.v6;
in6_addr_server->sin6_port = tuple4->dst_port;
}
return 0;
}
// return 0 : success
// return -1 : error
static int packet_io_config(const char *profile, struct config *config, void *logger)
{
int ret = 0;
MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&config->bypass_all_traffic, 0);
MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface));
MESA_load_profile_string_def(profile, "PACKET_IO", "tap_name", config->dev_tap, sizeof(config->dev_tap), "tap0");
MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_allow_mutilthread", &config->tap_allow_mutilthread);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "bpf_obj", config->bpf_obj, sizeof(config->bpf_obj));
MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_debug_log", (int *)&config->bpf_debug_log);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_hash_mode", (int *)&config->bpf_hash_mode);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_rps_enable", &config->tap_rps_enable);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "tap_rps_mask", config->tap_rps_mask, sizeof(config->tap_rps_mask));
MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_iouring", &config->enable_iouring);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_debuglog", &config->enable_debuglog);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "ring_size", &config->ring_size);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "buff_size", &config->buff_size);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "flags", &config->flags);
MESA_load_profile_int_nodef(profile, "PACKET_IO", "sq_thread_idle", &config->sq_thread_idle);
MESA_load_profile_string_def(profile, "traffic_steering", "device_client", config->dev_tap_c, sizeof(config->dev_tap_c), "tap_c");
MESA_load_profile_string_def(profile, "traffic_steering", "device_server", config->dev_tap_s, sizeof(config->dev_tap_s), "tap_s");
char src_mac_addr[18] = {0};
ret = MESA_load_profile_string_nodef(profile, "PACKET_IO", "src_mac_addr", src_mac_addr, sizeof(src_mac_addr));
if(ret < 0){
TFE_LOG_ERROR(logger, "%s: invalid src_mac_addr: src_mac_addr not set, profile = %s, section = PACKET_IO", LOG_TAG_PKTIO, profile);
return -1;
}
str_to_mac(src_mac_addr, config->src_mac);
ret = get_mac_by_device_name(config->dev_tap, config->tap_mac);
if (ret != 0) {
TFE_LOG_ERROR(logger, "%s: invalid tap_name: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap);
return -1;
}
ret = get_mac_by_device_name(config->dev_tap_c, config->tap_c_mac);
if (ret != 0) {
TFE_LOG_ERROR(logger, "%s: invalid device_client: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_c);
return -1;
}
ret = get_mac_by_device_name(config->dev_tap_s, config->tap_s_mac);
if (ret != 0) {
TFE_LOG_ERROR(logger, "%s: invalid device_server: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_s);
return -1;
}
if (config->rx_burst_max > RX_BURST_MAX)
{
TFE_LOG_ERROR(logger, "%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX);
return -1;
}
if (strlen(config->app_symbol) == 0)
{
TFE_LOG_ERROR(logger, "%s: invalid app_symbol in %s", LOG_TAG_PKTIO, profile);
return -1;
}
if (strlen(config->dev_nf_interface) == 0)
{
TFE_LOG_ERROR(logger, "%s: invalid dev_nf_interface in %s", LOG_TAG_PKTIO, profile);
return -1;
}
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bypass_all_traffic : %d", LOG_TAG_PKTIO, config->bypass_all_traffic);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_name : %s", LOG_TAG_PKTIO, config->tap_rps_mask);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_allow_mutilthread : %d", LOG_TAG_PKTIO, config->tap_allow_mutilthread);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_obj : %s", LOG_TAG_PKTIO, config->bpf_obj);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_debug_log : %d", LOG_TAG_PKTIO, config->bpf_debug_log);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_hash_mode : %d", LOG_TAG_PKTIO, config->bpf_hash_mode);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_rps_enable : %d", LOG_TAG_PKTIO, config->tap_rps_enable);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_rps_mask : %s", LOG_TAG_PKTIO, config->tap_rps_mask);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->enable_iouring : %d", LOG_TAG_PKTIO, config->enable_iouring);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->enable_debuglog : %d", LOG_TAG_PKTIO, config->enable_debuglog);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->ring_size : %d", LOG_TAG_PKTIO, config->ring_size);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->buff_size : %d", LOG_TAG_PKTIO, config->buff_size);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->flags : %d", LOG_TAG_PKTIO, config->flags);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->sq_thread_idle : %d", LOG_TAG_PKTIO, config->sq_thread_idle);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->device_client : %s", LOG_TAG_PKTIO, config->dev_tap_c);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->device_server : %s", LOG_TAG_PKTIO, config->dev_tap_s);
TFE_LOG_DEBUG(logger, "%s: PACKET_IO->src_mac_addr : %s", LOG_TAG_PKTIO, src_mac_addr);
return 0;
}
// return 0 : success
// return -1 : error
static int packet_io_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta, void *logger)
{
memset(meta, 0, sizeof(struct metadata));
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) <= 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get session_id from metadata", LOG_TAG_PKTIO);
return -1;
}
meta->raw_len = marsio_buff_datalen(rx_buff);
meta->raw_data = marsio_buff_mtod(rx_buff);
if (meta->raw_data == NULL || meta->raw_len == 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get raw_data from metadata", LOG_TAG_PKTIO);
return -1;
}
// 1: E2I
// 0: I2E
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_DIR, &(meta->is_e2i_dir), sizeof(meta->is_e2i_dir)) <= 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get buff_dir from metadata", LOG_TAG_PKTIO);
return -1;
}
if (marsio_buff_is_ctrlbuf(rx_buff))
{
meta->is_ctrl_pkt = 1;
// only control packet set MR_BUFF_PAYLOAD_OFFSET
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) <= 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get l7offset from metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
meta->is_ctrl_pkt = 0;
uint16_t user_data = 0;
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_USER_0, &(user_data), sizeof(user_data)) <= 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get is_decrypted from metadata", LOG_TAG_PKTIO);
return -1;
}
if (user_data & TRAFFIC_IS_DECRYPTED)
{
meta->is_decrypted = 1;
}
else
{
meta->is_decrypted = 0;
}
}
meta->route_ctx.len = marsio_buff_get_metadata(rx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, sizeof(meta->route_ctx.data));
if (meta->route_ctx.len <= 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get route_ctx from metadata", LOG_TAG_PKTIO);
return -1;
}
meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0]));
if (meta->sids.num < 0)
{
TFE_LOG_ERROR(logger, "%s: unable to get sid_list from metadata", LOG_TAG_PKTIO);
return -1;
}
return 0;
}
// return 0 : success
// return -1 : error
static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta, void *logger)
{
if (meta->session_id)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set session_id for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->is_ctrl_pkt)
{
marsio_buff_set_ctrlbuf(tx_buff);
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set l7offset for metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
uint16_t user_data = 0;
if (meta->is_decrypted)
{
user_data = SET_TRAFFIC_IS_DECRYPTED(user_data);
}
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_USER_0, &(user_data), sizeof(user_data)) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set is_decrypted for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->route_ctx.len > 0)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, meta->route_ctx.len) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set route_ctx for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->sids.num > 0)
{
if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set sid_list for metadata", LOG_TAG_PKTIO);
return -1;
}
}
return 0;
}
__attribute__((unused))static void packet_io_dump_metadata(struct metadata *meta, void *logger)
{
TFE_LOG_ERROR(logger, "%s: META={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}", LOG_TAG_PKTIO, meta->session_id, meta->raw_len, meta->is_e2i_dir, meta->is_ctrl_pkt, meta->l7offset, meta->is_decrypted, meta->sids.num);
}
/*
{
"tsync": "2.0",
"session_id": "123456789",
"state": "active",
"method": "log_update",
"params": {
"proxy": {
"ssl_intercept_info": {
mpack array
}
}
}
}
*/
static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
marsio_buff_t *tx_buffs[1];
struct metadata meta = {0};
void * logger = thread->logger;
int ret = 0;
int do_log = 0;
uint8_t hit_no_intercept = 0;
uint64_t rule_id = 0;
uint16_t length = 0;
uint8_t ssl_intercept_status = 0;
uint64_t ssl_upstream_latency = 0;
uint64_t ssl_downstream_latency = 0;
char ssl_upstream_version[64] = {0};
uint16_t ssl_upstream_version_length = 0;
char ssl_downstream_version[64] = {0};
uint16_t ssl_downstream_version_length = 0;
uint8_t ssl_cert_verify = 0;
char ssl_error[64] = {0};
uint16_t ssl_error_length = 0;
char ssl_passthrough_reason[32] = {0};
uint16_t ssl_passthrough_reason_length = 0;
uint8_t ssl_pinning_state = 0;
char *data = NULL;
size_t size;
mpack_writer_t writer;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_POLICY_DO_LOG, (unsigned char *)&do_log, sizeof(do_log), &length);
if (ret < 0 || do_log == 0)
return;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &length);
if (ret < 0)
return;
ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &length);
if (ret < 0)
return;
mpack_writer_init_growable(&writer, &data, &size);
// root map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "tsync");
mpack_write_cstr(&writer, "2.0");
mpack_write_cstr(&writer, "session_id");
mpack_write_u64(&writer, s_ctx->session_id);
mpack_write_cstr(&writer, "state");
mpack_write_cstr(&writer, "active");
mpack_write_cstr(&writer, "method");
mpack_write_cstr(&writer, "log_update");
mpack_write_cstr(&writer, "params");
// params map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "proxy");
// proxy map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "ssl_intercept_info");
mpack_build_array(&writer);
// proxy rule list
mpack_build_array(&writer);
mpack_write_u64(&writer, rule_id);
mpack_complete_array(&writer);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length);
tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length);
mpack_write_u8(&writer, hit_no_intercept?0:1);
mpack_write_u8(&writer, ssl_intercept_status);
mpack_write_u64(&writer, ssl_upstream_latency);
mpack_write_u64(&writer, ssl_downstream_latency);
mpack_write_str(&writer, ssl_upstream_version, ssl_upstream_version_length);
mpack_write_str(&writer, ssl_downstream_version, ssl_downstream_version_length);
mpack_write_u8(&writer, ssl_pinning_state);
mpack_write_u8(&writer, ssl_cert_verify);
mpack_write_str(&writer, ssl_error, ssl_error_length);
mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length);
mpack_complete_array(&writer);
// proxy map end
mpack_complete_map(&writer);
// params map end
mpack_complete_map(&writer);
// root map end
mpack_complete_map(&writer);
// finish writing
if (mpack_writer_destroy(&writer) != mpack_ok)
{
if (data)
{
free(data);
data = NULL;
}
return;
}
char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data;
int raw_packet_header_len = s_ctx->ctrl_meta->l7offset;
marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_seq);
char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size);
if (dst == NULL)
{
TFE_LOG_ERROR(logger, "%s: unable to send log_update packet, get marsio buff is NULL.", LOG_TAG_PKTIO);
if (data)
free(data);
return;
}
memcpy(dst, raw_packet_header_data, raw_packet_header_len);
memcpy(dst + raw_packet_header_len, data, size);
meta.session_id = s_ctx->session_id;
meta.l7offset = raw_packet_header_len;
meta.is_ctrl_pkt = 1;
meta.sids.num = 1;
meta.sids.elems[0] = acceptor_ctx->firewall_sids;
route_ctx_copy(&meta.route_ctx, &s_ctx->ctrl_meta->route_ctx);
packet_io_set_metadata(tx_buffs[0], &meta, logger);
ATOMIC_INC(&(packet_io_fs->session_log));
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1);
if (data)
free(data);
return;
}
static void tcp_restore_info_dump(struct tcp_restore_info *info, uint64_t session_id, void *logger)
{
char str_client_addr[64] = { 0 };
char str_server_addr[64] = { 0 };
const struct tcp_restore_endpoint *client = &info->client;
const struct tcp_restore_endpoint *server = &info->server;
assert(client->addr.ss_family == server->addr.ss_family);
if (client->addr.ss_family == AF_INET)
{
struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr;
struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr;
uint16_t port_client = ntohs(sk_client->sin_port);
uint16_t port_server = ntohs(sk_server->sin_port);
inet_ntop(AF_INET, &sk_client->sin_addr, str_client_addr, sizeof(str_client_addr));
inet_ntop(AF_INET, &sk_server->sin_addr, str_server_addr, sizeof(str_client_addr));
TFE_LOG_DEBUG(logger, "restore_info session %lu %s:%hu %s:%hu: cur_dir=%u\n"
"\tclient={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }\n"
"\tserver={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }",
session_id, str_client_addr, port_client, str_server_addr, port_server, info->cur_dir,
str_client_addr, port_client, client->seq, client->ack, client->ts_val, client->mss, client->window, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0),
str_server_addr, port_server, server->seq, server->ack, server->ts_val, server->mss, server->window, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0));
}
else if (client->addr.ss_family == AF_INET6)
{
struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr;
struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr;
uint16_t port_client = ntohs(sk_client->sin6_port);
uint16_t port_server = ntohs(sk_server->sin6_port);
inet_ntop(AF_INET6, &sk_client->sin6_addr, str_client_addr, sizeof(str_client_addr));
inet_ntop(AF_INET6, &sk_server->sin6_addr, str_server_addr, sizeof(str_client_addr));
TFE_LOG_DEBUG(logger, "restore_info session %lu %s:%hu %s:%hu: tcp_restore_info %p cur_dir=%u\n"
"\tclient={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }\n"
"\tserver={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }",
session_id, str_client_addr, port_client, str_server_addr, port_server, info, info->cur_dir,
str_client_addr, port_client, client->seq, client->ack, client->ts_val, client->mss, client->window, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0),
str_server_addr, port_server, server->seq, server->ack, server->ts_val, server->mss, server->window, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0));
}
}
static void set_passthrough_reason(struct tfe_cmsg *cmsg, char *reason)
{
uint8_t ssl_intercept_status = SSL_ACTION_PASSTHROUGH;
tfe_cmsg_set(cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (const unsigned char *)reason, strlen(reason));
tfe_cmsg_set(cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (const unsigned char *)&ssl_intercept_status, (uint16_t)sizeof(ssl_intercept_status));
tfe_cmsg_set_flag(cmsg, TFE_CMSG_FLAG_USER0);
}
typedef int tcp_handshake_fn(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size);
static void packet_io_send_fake_pkt(struct packet_io_thread_ctx *thread, struct tcp_restore_info *info, uint64_t session_id, struct route_ctx *c2s_route_ctx, struct route_ctx *s2c_route_ctx)
{
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct ether_addr *client_mac = (struct ether_addr *)&packet_io->config.tap_c_mac;
struct ether_addr *server_mac = (struct ether_addr *)&packet_io->config.tap_s_mac;
void *logger = thread->logger;
char buffer[1500];
struct metadata meta = {0};
meta.session_id = session_id;
meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0);
meta.is_ctrl_pkt = 0;
meta.l7offset = 0;
meta.sids.num = 2;
meta.sids.elems[0] = acceptor_ctx->sce_sids;
meta.sids.elems[1] = acceptor_ctx->proxy_sids;
static tcp_handshake_fn *fn[3] = {tfe_tcp_restore_syn_packet, tfe_tcp_restore_synack_packet, tfe_tcp_restore_ack_packet};
marsio_buff_t *tx_buffs[3];
marsio_buff_malloc_global(packet_io->instance, tx_buffs, 3, 0, thread->thread_index);
for (int i = 0; i < 3; i++)
{
meta.raw_len = fn[i](info, client_mac, server_mac, buffer, sizeof(buffer));
meta.raw_data = marsio_buff_append(tx_buffs[i], meta.raw_len);
if (meta.raw_data == NULL)
{
TFE_LOG_ERROR(logger, "%s: unable to send fake packet, get marsio buff is NULL.", LOG_TAG_PKTIO);
continue;
}
memcpy(meta.raw_data, buffer, meta.raw_len);
switch (i)
{
case 0: /* fail through */
case 2:
route_ctx_copy(&meta.route_ctx, c2s_route_ctx);
break;
case 1:
route_ctx_copy(&meta.route_ctx, s2c_route_ctx);
break;
}
packet_io_set_metadata(tx_buffs[i], &meta, logger);
throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, meta.raw_len);
}
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 3, MARSIO_SEND_OPT_REHASH);
}
int raw_traffic_decapsulate(const char *raw_data, int raw_len, struct packet_info *pkt_info)
{
const struct layer_record *l3_layer_record = NULL;
struct packet pkt;
packet_parse(&pkt, raw_data, raw_len);
l3_layer_record = packet_get_innermost_layer(&pkt, LAYER_TYPE_L3);
if (l3_layer_record == NULL)
return -1;
pkt_info->is_ipv4 = l3_layer_record->type == LAYER_TYPE_IPV4 ? 1 : 0;
pkt_info->header_len = l3_layer_record->hdr_offset;
pkt_info->header_data = (char *)calloc(pkt_info->header_len, sizeof(char));
memcpy(pkt_info->header_data, raw_data, pkt_info->header_len);
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
int ret = 0;
int fd_downstream = 0;
int fd_upstream = 0;
int fd_fake_c = 0;
int fd_fake_s = 0;
uint16_t size = 0;
uint8_t is_passthrough = 0;
uint8_t hit_no_intercept = 0;
uint8_t is_session_id_only_key = 0;
uint16_t out_size = 0;
char stream_traceid[24] = {0};
char reason_no_intercept_param[] = "Hit No Intercept Policy";
char reason_invalid_intercept_param[] = "Invalid Intercept Param";
char reason_invalid_tcp_policy_param[] = "Invalid tcp policy Param";
char reason_underlying_stream_error[] = "Underlying Stream Error";
unsigned int stream_common_direction;
uint8_t stream_protocol_in_char = 0;
uint8_t enable_decrypted_traffic_steering = 0;
struct session_ctx *s_ctx = NULL;
struct tuple4 inner_tuple4;
struct tcp_restore_info restore_info;
memset(&inner_tuple4, 0, sizeof(inner_tuple4));
memset(&restore_info, 0, sizeof(restore_info));
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
void * logger = thread->logger;
struct packet pkt;
const void *payload = packet_parse(&pkt, (const char *)meta->raw_data, meta->raw_len);
if ((char *)payload - meta->raw_data != meta->l7offset)
{
uint16_t offset = (char *)payload - meta->raw_data;
TFE_LOG_ERROR(logger, "%s: incorrect dataoffset in the control zone of session %lu, offset:%u, l7offset:%u, payload:%p, raw_data:%p", LOG_TAG_PKTIO, meta->session_id, offset, meta->l7offset, payload, meta->raw_data);
}
packet_get_innermost_tuple4(&pkt, &inner_tuple4);
tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, sizeof(stream_protocol_in_char), &size);
uint64_t rule_id = 0;
ret = intercept_policy_select(thread->ref_proxy->int_ply_enforcer, parser->tfe_policy_ids, parser->tfe_policy_id_num, &rule_id);
if (ret != 0)
{
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
goto passthrough;
}
tfe_cmsg_set(parser->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&rule_id, sizeof(uint64_t));
ret = intercept_policy_enforce(thread->ref_proxy->int_ply_enforcer, parser->cmsg);
if (ret != 0) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
goto passthrough;
}
tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size);
if (hit_no_intercept == 1) {
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->hit_no_intercept_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_no_intercept_param);
goto passthrough;
}
__atomic_fetch_add(&packet_io_fs->hit_intercept_num, 1, __ATOMIC_RELAXED);
if (parser->intercpet_data == 0) {
ret = tcp_policy_enforce(thread->ref_proxy->tcp_ply_enforcer, parser->cmsg);
if (ret != 0) {
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
for (int i = 0; i < parser->sce_policy_id_num; i++) {
chaining_policy_enforce(thread->ref_proxy->chain_ply_enforcer, parser->cmsg, parser->sce_policy_ids[i]);
}
tcp_restore_set_from_cmsg(parser->cmsg, &restore_info);
tcp_restore_set_from_pkg(&inner_tuple4, &restore_info);
if (overwrite_tcp_mss(parser->cmsg, &restore_info, meta->session_id, logger)) {
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
tcp_restore_info_dump(&restore_info, meta->session_id, logger);
// tcp repair C2S
fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), packet_io->config.dev_tap, 0x65);
if (fd_upstream < 0) {
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(UPSTREAM)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
// tcp repair S2C
fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), packet_io->config.dev_tap, 0x65);
if (fd_downstream < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(DOWNSTREAM)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_DECRYPTED_TRAFFIC_STEERING, (unsigned char *)&enable_decrypted_traffic_steering, sizeof(enable_decrypted_traffic_steering), &size);
if ((STREAM_PROTO_PLAIN == (enum tfe_stream_proto)stream_protocol_in_char && thread->ref_proxy->traffic_steering_options.enable_steering_http) ||
(STREAM_PROTO_SSL == (enum tfe_stream_proto)stream_protocol_in_char && thread->ref_proxy->traffic_steering_options.enable_steering_ssl) ||
enable_decrypted_traffic_steering == 1)
{
packet_io_send_fake_pkt(thread, &restore_info, meta->session_id, &parser->seq_route_ctx, &parser->ack_route_ctx);
fd_fake_c = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), thread->ref_proxy->traffic_steering_options.device_client, thread->ref_proxy->traffic_steering_options.so_mask_client);
if (fd_fake_c < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_c)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
fd_fake_s = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), thread->ref_proxy->traffic_steering_options.device_server, thread->ref_proxy->traffic_steering_options.so_mask_server);
if (fd_fake_s < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_s)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
}
stream_common_direction = meta->is_e2i_dir ? 'I' : 'E';
tfe_cmsg_set(parser->cmsg, TFE_CMSG_COMMON_DIRECTION, (const unsigned char *)&stream_common_direction, sizeof(stream_common_direction));
snprintf(stream_traceid, 24, "%" PRIu64, meta->session_id);
tfe_cmsg_set(parser->cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char *)stream_traceid, strlen(stream_traceid));
tfe_cmsg_dup(parser->cmsg);
// 为避免 packet IO thread 与 worker 访问 cmsg 时出现竞争packet IO thread 必须在调用 tfe_proxy_fds_accept 之前 set cmsg
if (tfe_proxy_fds_accept(thread->ref_proxy, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s, parser->cmsg) < 0)
{
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tfe_proxy_fds_accept()", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
goto passthrough;
}
__atomic_fetch_add(&packet_io_fs->can_intercept_num, 1, __ATOMIC_RELAXED);
}
else if (parser->intercpet_data & (IS_SINGLE | IS_TUNNEL)) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_underlying_stream_error);
if (parser->intercpet_data & IS_SINGLE) {
__atomic_fetch_add(&packet_io_fs->asymmetric_num, 1, __ATOMIC_RELAXED);
}
else if (parser->intercpet_data & IS_TUNNEL) {
__atomic_fetch_add(&packet_io_fs->tunnel_num, 1, __ATOMIC_RELAXED);
}
}
passthrough:
s_ctx = session_ctx_new();
s_ctx->ctrl_meta = metadata_new();
s_ctx->protocol = stream_protocol_in_char;
s_ctx->ref_thread_ctx = thread;
s_ctx->session_id = meta->session_id;
tuple4_tostring(&inner_tuple4, s_ctx->session_addr, sizeof(s_ctx->session_addr));
s_ctx->cmsg = parser->cmsg;
s_ctx->policy_ids = parser->tfe_policy_ids[0];
s_ctx->is_passthrough = is_passthrough;
metadata_deep_copy(s_ctx->ctrl_meta, meta);
sids_copy(&s_ctx->ctrl_meta->sids, &meta->sids);
route_ctx_copy(&s_ctx->ctrl_meta->route_ctx, &meta->route_ctx);
if (parser->seq_len > 0)
raw_traffic_decapsulate(parser->seq_header, parser->seq_len, &s_ctx->c2s_info);
if (parser->ack_len > 0)
raw_traffic_decapsulate(parser->ack_header, parser->ack_len, &s_ctx->s2c_info);
uint8_t packet_cur_dir = CURDIR_C2S;
tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR, (unsigned char *)&packet_cur_dir, sizeof(uint8_t), &size);
if (packet_cur_dir == CURDIR_C2S) {
s_ctx->c2s_info.tuple4 = inner_tuple4;
tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4);
}
else {
s_ctx->s2c_info.tuple4 = inner_tuple4;
tuple4_reverse(&inner_tuple4, &s_ctx->c2s_info.tuple4);
}
// is_passthrough为1时只通过session id创建流表避免四元组相同的情况下uthash频繁扩展导致崩溃
if (is_passthrough)
is_session_id_only_key = 1;
// c2s
sids_copy(&s_ctx->c2s_info.sids, &parser->seq_sids);
route_ctx_copy(&s_ctx->c2s_info.route_ctx, &parser->seq_route_ctx);
// s2c
sids_copy(&s_ctx->s2c_info.sids, &parser->ack_sids);
route_ctx_copy(&s_ctx->s2c_info.route_ctx, &parser->ack_route_ctx);
TFE_LOG_INFO(logger, "%s: session %lu %s active first, hit rule %lu", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr, rule_id);
session_table_insert(thread->session_table, is_session_id_only_key, s_ctx->session_id, &(s_ctx->c2s_info.tuple4), s_ctx, session_value_free_cb);
ATOMIC_INC(&(packet_io_fs->session_num));
if (parser->seq_header)
FREE(&parser->seq_header);
if (parser->ack_header)
FREE(&parser->ack_header);
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (!node)
{
return handle_session_opening(meta, parser, thread_seq, ctx);
}
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
void * logger = thread->logger;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (node)
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1);
session_table_delete_by_id(thread->session_table, meta->session_id);
ATOMIC_DEC(&(packet_io_fs->session_num));
return 0;
}
return -1;
}
// return 0 : success
// return -1 : error
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
void * logger = thread->logger;
TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
ATOMIC_ZERO(&(packet_io_fs->session_num));
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++)
{
struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i];
__atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED);
}
return 0;
}
// return 0 : success
// return -1 : error
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
void * logger = thread->logger;
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta, logger) == -1)
{
TFE_LOG_ERROR(logger, "%s: unexpected control packet, unable to get metadata\n\tMETA={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}",
LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
struct ctrl_pkt_parser ctrl_parser;
ctrl_packet_parser_init(&ctrl_parser);
if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7offset, meta.raw_len - meta.l7offset, logger) == -1)
{
TFE_LOG_ERROR(logger, "%s: unexpected control packet, metadata's session %lu unable to parse data", LOG_TAG_PKTIO, meta.session_id);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
if (ctrl_parser.session_id != meta.session_id)
{
TFE_LOG_ERROR(logger, "%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
ctrl_packet_cmsg_destroy(&ctrl_parser);
return -1;
}
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED);
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx);
break;
case SESSION_STATE_CLOSING:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED);
return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_ACTIVE:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED);
return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_RESETALL:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED);
return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx);
default:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
break;
}
return 0;
}
static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct packet pkt;
struct tuple4 inner_addr;
int is_ipv4 = 0;
uint8_t flag = 0;
char *header = NULL;
int header_len = 0;
void * logger = thread->logger;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta, logger) == -1) {
TFE_LOG_ERROR(logger, "%s: unexpected control packet, unable to get metadata\n\tMETA={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}",
LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)
{
if (search_packet_from_dablooms(&pkt, thread->dup_packet_filter) == 1)
{
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
}
struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id);
if (node == NULL) {
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
if (thread->ref_acceptor_ctx->debug)
{
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
if (s_ctx->is_passthrough > 0) {
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
s_ctx->c2s_info.is_e2i_dir = meta.is_e2i_dir;
throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len);
}
else {
s_ctx->s2c_info.is_e2i_dir = meta.is_e2i_dir;
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
}
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0);
flag = tfe_cmsg_get_flag(s_ctx->cmsg);
if (flag & TFE_CMSG_FLAG_USER0) {
send_event_log(s_ctx, thread_seq, ctx);
tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return 0;
}
if (meta.is_decrypted)
{
throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac);
throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len);
if (packet_io->config.enable_iouring) {
io_uring_write(thread->tap_ctx->io_uring_s, raw_data, raw_len);
}
else {
tap_write(thread->tap_ctx->tap_s, raw_data, raw_len, logger);
}
}
else {
add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac);
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len);
if (packet_io->config.enable_iouring) {
io_uring_write(thread->tap_ctx->io_uring_c, raw_data, raw_len);
}
else {
tap_write(thread->tap_ctx->tap_c, raw_data, raw_len, logger);
}
}
}
else
{
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
s_ctx->c2s_info.sids = meta.sids;
s_ctx->c2s_info.is_e2i_dir = meta.is_e2i_dir;
header = s_ctx->c2s_info.header_data;
header_len = s_ctx->c2s_info.header_len;
is_ipv4 = s_ctx->c2s_info.is_ipv4;
throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len);
}
else {
s_ctx->s2c_info.sids = meta.sids;
s_ctx->s2c_info.is_e2i_dir = meta.is_e2i_dir;
header = s_ctx->s2c_info.header_data;
header_len = s_ctx->s2c_info.header_len;
is_ipv4 = s_ctx->s2c_info.is_ipv4;
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
}
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0);
if (header != NULL) {
char *packet_buff = NULL;
int packet_len = sizeof(struct ethhdr) + raw_len - header_len;
packet_buff = (char *)calloc(packet_len, sizeof(char));
memcpy(packet_buff + sizeof(struct ethhdr), raw_data + header_len, raw_len - header_len);
add_ether_header(packet_buff, packet_io->config.src_mac, packet_io->config.tap_mac);
if (is_ipv4)
add_ether_proto(packet_buff, ETH_P_IP);
else
add_ether_proto(packet_buff, ETH_P_IPV6);
if (packet_io->config.enable_iouring) {
io_uring_write(thread->tap_ctx->io_uring_fd, packet_buff, packet_len);
}
else {
tap_write(thread->tap_ctx->tap_fd, packet_buff, packet_len, logger);
}
throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, packet_len);
if (packet_buff)
free(packet_buff);
}
else {
// send to tap0
add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac);
if (packet_io->config.enable_iouring) {
io_uring_write(thread->tap_ctx->io_uring_fd, raw_data, raw_len);
}
else {
tap_write(thread->tap_ctx->tap_fd, raw_data, raw_len, logger);
}
throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, raw_len);
}
flag = tfe_cmsg_get_flag(s_ctx->cmsg);
if (flag & TFE_CMSG_FLAG_USER0) {
send_event_log(s_ctx, thread_seq, ctx);
tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT);
}
}
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
return 0;
}
/******************************************************************************
* EXTERN
******************************************************************************/
int is_enable_iouring(struct packet_io *handle)
{
return handle->config.enable_iouring;
}
void tfe_tap_ctx_destory(struct tap_ctx *handler)
{
if (handler) {
io_uring_instance_destory(handler->io_uring_fd);
io_uring_instance_destory(handler->io_uring_c);
io_uring_instance_destory(handler->io_uring_s);
if (handler->eventfd > 0)
close(handler->eventfd);
if (handler->eventfd_c > 0)
close(handler->eventfd_c);
if (handler->eventfd_s > 0)
close(handler->eventfd_s);
tap_close(handler->tap_fd);
tap_close(handler->tap_c);
tap_close(handler->tap_s);
free(handler);
handler = NULL;
}
}
struct tap_ctx *tfe_tap_ctx_create(void *ctx)
{
int ret = 0;
struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread_ctx->ref_io;
struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx));
assert(tap_ctx != NULL);
tap_ctx->tap_fd = tap_open(packet_io->config.dev_tap, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_ctx->tap_c = tap_open(packet_io->config.dev_tap_c, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_ctx->tap_s = tap_open(packet_io->config.dev_tap_s, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE);
tap_up_link(packet_io->config.dev_tap);
tap_up_link(packet_io->config.dev_tap_c);
tap_up_link(packet_io->config.dev_tap_s);
// fcntl(2) EFD_NONBLOCK 不生效
tap_ctx->eventfd = eventfd(0, EFD_NONBLOCK);
tap_ctx->eventfd_c = eventfd(0, EFD_NONBLOCK);
tap_ctx->eventfd_s = eventfd(0, EFD_NONBLOCK);
if (packet_io->config.enable_iouring) {
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_fd);
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_c);
bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_s);
tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, tap_ctx->eventfd, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_fd == NULL)
goto error_out;
tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, tap_ctx->eventfd_c, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_c == NULL)
goto error_out;
tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, tap_ctx->eventfd_s, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog);
if (tap_ctx->io_uring_s == NULL)
goto error_out;
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd, thread_ctx->thread_index);
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_c, thread_ctx->thread_index);
marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_s, thread_ctx->thread_index);
}
if (packet_io->config.tap_rps_enable)
{
ret = tap_set_rps(packet_io->config.dev_tap, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
ret = tap_set_rps(packet_io->config.dev_tap_c, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
ret = tap_set_rps(packet_io->config.dev_tap_s, thread_ctx->thread_index, packet_io->config.tap_rps_mask);
if (ret != 0)
goto error_out;
}
return tap_ctx;
error_out:
tfe_tap_ctx_destory(tap_ctx);
return NULL;
}
int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, void *logger)
{
if (marsio_thread_init(handle->instance) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index);
return -1;
}
return 0;
}
void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms)
{
struct mr_vdev *vdevs[] = {handle->dev_nf_interface.mr_dev};
marsio_poll_wait(handle->instance, vdevs, 1, thread_ctx->thread_index, timeout_ms);
}
void packet_io_destory(struct packet_io *handle)
{
if (handle)
{
if (handle->dev_nf_interface.mr_path)
{
marsio_sendpath_destory(handle->dev_nf_interface.mr_path);
handle->dev_nf_interface.mr_path = NULL;
}
if (handle->dev_nf_interface.mr_dev)
{
marsio_close_device(handle->dev_nf_interface.mr_dev);
handle->dev_nf_interface.mr_dev = NULL;
}
if (handle->instance)
{
marsio_destory(handle->instance);
handle->instance = NULL;
}
if (handle->config.tap_bpf_ctx)
{
bpf_obj_unload(handle->config.tap_bpf_ctx);
handle->config.tap_bpf_ctx = NULL;
}
free(handle);
handle = NULL;
}
}
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask, void *logger)
{
int opt = 1;
struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
assert(handle != NULL);
handle->thread_num = thread_num;
if (packet_io_config(profile, &(handle->config), logger) != 0)
{
goto error_out;
}
if (handle->config.tap_allow_mutilthread)
{
if (handle->config.bpf_hash_mode != 2 && handle->config.bpf_hash_mode != 4)
{
TFE_LOG_ERROR(logger, "%s: bpf_hash_mode[%d] invalid.", LOG_TAG_PKTIO, handle->config.bpf_hash_mode);
goto error_out;
}
handle->config.tap_bpf_ctx = bpf_obj_load(handle->config.bpf_obj, thread_num, handle->config.bpf_hash_mode, handle->config.bpf_debug_log);
if (handle->config.tap_bpf_ctx == NULL)
{
TFE_LOG_ERROR(logger, "%s: under mutilthread mode, Unable to load bpf object.", LOG_TAG_PKTIO);
goto error_out;
}
}
else if (thread_num > 1){
TFE_LOG_ERROR(logger, "%s: under tap mode, when disable tap_allow_mutilthread, only support one work thread.", LOG_TAG_PKTIO);
goto error_out;
}
handle->instance = marsio_create();
if (handle->instance == NULL)
{
TFE_LOG_ERROR(logger, "%s: unable to create marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_init(handle->instance, handle->config.app_symbol) != 0)
{
TFE_LOG_ERROR(logger, "%s: unable to initialize marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
// Netwrok Function Interface
handle->dev_nf_interface.mr_dev = marsio_open_device(handle->instance, handle->config.dev_nf_interface, handle->thread_num, handle->thread_num);
if (handle->dev_nf_interface.mr_dev == NULL)
{
TFE_LOG_ERROR(logger, "%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
handle->dev_nf_interface.mr_path = marsio_sendpath_create_by_vdev(handle->dev_nf_interface.mr_dev);
if (handle->dev_nf_interface.mr_path == NULL)
{
TFE_LOG_ERROR(logger, "%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
return handle;
error_out:
packet_io_destory(handle);
return NULL;
}
// return n_packet_recv
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
// nr_recv <= rx_burst_max <= RX_BURST_MAX
int nr_recv = marsio_recv_burst(handle->dev_nf_interface.mr_dev, thread_seq, rx_buffs, handle->config.rx_burst_max);
if (nr_recv <= 0)
{
return 0;
}
if (handle->config.bypass_all_traffic == 1)
{
for (int i = 0; i < nr_recv; i++)
{
int raw_len = marsio_buff_datalen(rx_buffs[i]);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (is_downstream_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&packet_io_fs->keepalived_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->keepalived_pkt_tx, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
continue;
}
if (thread->ref_acceptor_ctx->debug)
{
struct tuple4 inner_addr;
struct packet pkt;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
TFE_LOG_DEBUG(thread->logger, "recv packet %s (ipid: %u)", buffer, ipid);
}
if (marsio_buff_is_ctrlbuf(rx_buff))
{
throughput_metrics_inc(&packet_io_fs->ctrl_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->ctrl_pkt_tx, 1, raw_len);
// all control packet need bypass
handle_control_packet(handle, rx_buff, thread_seq, ctx);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
}
else
{
handle_raw_packet_from_nf(handle, rx_buff, thread_seq, ctx);
}
}
return nr_recv;
}
void handle_decryption_packet_from_tap(const char *data, int len, void *args)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct tuple4 inner_addr;
struct packet pkt;
void * logger = thread->logger;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)data, len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
struct session_node *node = session_table_search_by_addr(thread->session_table, &inner_addr);
if (node == NULL) {
if (thread->ref_acceptor_ctx->debug)
{
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
int16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
TFE_LOG_ERROR(logger, "decypted packet from tap %s (ipid: %u) miss session table", buffer, ipid);
}
throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len);
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index);
if (alloc_ret < 0){
TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d",
alloc_ret, thread->thread_index);
throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len);
return;
}
char *dst = marsio_buff_append(tx_buffs[0], len);
if (dst == NULL)
{
throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len);
TFE_LOG_ERROR(logger, "%s: unable to send decryption packet, get marsio buff is NULL.", LOG_TAG_PKTIO);
return;
}
memcpy(dst, data, len);
struct metadata meta = {0};
meta.session_id = s_ctx->session_id;
meta.raw_data = dst;
meta.raw_len = len;
meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0);
meta.is_ctrl_pkt = 0;
meta.l7offset = 0;
meta.sids.num = 2;
meta.sids.elems[0] = acceptor_ctx->sce_sids;
meta.sids.elems[1] = acceptor_ctx->proxy_sids;
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
meta.route_ctx = s_ctx->c2s_info.route_ctx;
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_rx, 1, len);
}
else {
meta.route_ctx = s_ctx->s2c_info.route_ctx;
throughput_metrics_inc(&packet_io_fs->tap_s_pkt_rx, 1, len);
}
packet_io_set_metadata(tx_buffs[0], &meta, logger);
throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, len);
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH);
}
void handle_raw_packet_from_tap(const char *data, int len, void *args)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct tuple4 inner_addr;
struct packet pkt;
struct metadata meta = {0};
void * logger = thread->logger;
char *dst = NULL;
char *header = NULL;
int header_len = 0;
int packet_len = 0;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)data, len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
throughput_metrics_inc(&packet_io_fs->tap_pkt_rx, 1, len);
struct session_node *node = session_table_search_by_addr(thread->session_table, &inner_addr);
if (node == NULL) {
throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len);
if (thread->ref_acceptor_ctx->debug)
{
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
TFE_LOG_ERROR(logger, "raw packet from tap %s (ipid: %u) miss session table", buffer, ipid);
}
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index);
if (alloc_ret < 0){
TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d",
alloc_ret, thread->thread_index);
throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len);
return;
}
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0)
{
header = s_ctx->c2s_info.header_data;
header_len = s_ctx->c2s_info.header_len;
meta.sids = s_ctx->c2s_info.sids;
meta.route_ctx = s_ctx->c2s_info.route_ctx;
}
else
{
header = s_ctx->s2c_info.header_data;
header_len = s_ctx->s2c_info.header_len;
meta.sids = s_ctx->s2c_info.sids;
meta.route_ctx = s_ctx->s2c_info.route_ctx;
}
if (header != NULL) {
packet_len = len + header_len - sizeof(struct ethhdr);
dst = marsio_buff_append(tx_buffs[0], packet_len);
if (dst == NULL)
{
throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len);
TFE_LOG_ERROR(logger, "%s: unable to send raw packet, get marsio buff is NULL.", LOG_TAG_PKTIO);
return;
}
memcpy(dst, header, header_len);
memcpy(dst + header_len, data + sizeof(struct ethhdr), len - sizeof(struct ethhdr));
}
else {
packet_len = len;
dst = marsio_buff_append(tx_buffs[0], len);
if (dst == NULL)
{
throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len);
TFE_LOG_ERROR(logger, "%s: unable to send raw packet, get marsio buff is NULL.", LOG_TAG_PKTIO);
return;
}
memcpy(dst, data, len);
}
meta.session_id = s_ctx->session_id;
meta.raw_data = dst;
meta.raw_len = packet_len;
meta.is_decrypted = 0;
meta.is_ctrl_pkt = 0;
meta.l7offset = 0;
packet_io_set_metadata(tx_buffs[0], &meta, logger);
throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, packet_len);
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)
{
add_packet_to_dablooms(&pkt, thread->dup_packet_filter);
}
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH);
}