#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tfe_ctrl_packet.h" #include "tfe_raw_packet.h" #include "io_uring.h" #include "tfe_packet_io_fs.h" #include "tfe_cmsg.h" #include "tfe_tcp_restore.h" #include "tfe_stream.h" #include "packet_construct.h" #include "mpack.h" #include "tap.h" #include "bpf_obj.h" #include "tfe_session_table.h" #include "tfe_packet_io.h" #include "tfe_fieldstat.h" /****************************************************************************** * Struct ******************************************************************************/ #define RX_BURST_MAX 128 #define IS_SINGLE 0x01 #define IS_TUNNEL 0x02 #define TRAFFIC_IS_DECRYPTED (1 << 0) #define SET_TRAFFIC_IS_DECRYPTED(field) (field | TRAFFIC_IS_DECRYPTED) #define CLEAR_TRAFFIC_IS_DECRYPTED(field) (field & ~TRAFFIC_IS_DECRYPTED) struct config { int bypass_all_traffic; int rx_burst_max; int enable_iouring; int enable_debuglog; int ring_size; int buff_size; int flags; int sq_thread_idle; int bpf_debug_log; int bpf_hash_mode; int tap_allow_mutilthread; char bpf_obj[1024]; char src_mac[6]; char tap_mac[6]; char tap_c_mac[6]; char tap_s_mac[6]; char dev_tap[16]; char dev_tap_c[16]; char dev_tap_s[16]; int tap_rps_enable; char tap_rps_mask[TFE_SYMBOL_MAX]; char app_symbol[256]; char dev_nf_interface[256]; struct bpf_obj_ctx *tap_bpf_ctx; }; struct device { struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; }; struct packet_io { int thread_num; struct mr_instance *instance; struct device dev_nf_interface; struct config config; }; enum raw_pkt_action { RAW_PKT_ERR_BYPASS, RAW_PKT_HIT_BYPASS, RAW_PKT_HIT_BLOCK, RAW_PKT_HIT_STEERING, RAW_PKT_HIT_MIRRORING, }; enum inject_pkt_action { INJT_PKT_ERR_DROP, INJT_PKT_MIRR_RX_DROP, INJT_PKT_HIT_BLOCK, INJT_PKT_HIT_FWD2SF, // forward to service function INJT_PKT_HIT_FWD2NF, // forward to network function }; struct metadata { int write_ref; uint64_t session_id; char *raw_data; int raw_len; int is_e2i_dir; int is_ctrl_pkt; uint16_t l7offset; // only control packet set l7offset uint16_t is_decrypted; struct sids sids; struct route_ctx route_ctx; }; extern int tcp_policy_enforce(struct tcp_policy_enforcer *tcp_enforcer, struct tfe_cmsg *cmsg); extern int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg * cmsg); extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, struct tfe_cmsg *cmsg, uint64_t rule_id); /****************************************************************************** * STATIC ******************************************************************************/ // return 0 : not keepalive packet // return 1 : is keepalive packet static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff) { int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)raw_data; if (eth_hdr->h_proto == 0xAAAA) { return 1; } else { return 0; } } static int tap_write(int tap_fd, const char *data, int data_len, void *logger) { int ret = write(tap_fd, data, data_len); if (ret != data_len) { TFE_LOG_ERROR(logger, "%s: need send %dB, only send %dB, aborting: %s", LOG_TAG_PKTIO, data_len, ret, strerror(errno)); } return ret; } static struct metadata *metadata_new() { struct metadata *meta = (struct metadata *)calloc(1, sizeof(struct metadata)); return meta; } static int metadata_is_empty(struct metadata *meta) { return meta->write_ref == 0 ? 1 : 0; } static void metadata_deep_copy(struct metadata *dst, struct metadata *src) { dst->write_ref++; dst->session_id = src->session_id; dst->raw_data = (char *)calloc(src->raw_len, sizeof(char)); memcpy(dst->raw_data, src->raw_data, src->raw_len); dst->raw_len = src->raw_len; dst->l7offset = src->l7offset; dst->is_e2i_dir = src->is_e2i_dir; dst->is_ctrl_pkt = src->is_ctrl_pkt; dst->is_decrypted = src->is_decrypted; } void metadata_free(struct metadata *meta) { if (meta) { if (meta->raw_data) { free(meta->raw_data); meta->raw_data = NULL; } free(meta); meta = NULL; } } static struct session_ctx *session_ctx_new() { struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx)); assert(ctx != NULL); return ctx; } static void session_ctx_free(struct session_ctx *ctx) { if (ctx) { if (ctx->session_addr) { free(ctx->session_addr); ctx->session_addr = NULL; } if (ctx->cmsg) { tfe_cmsg_destroy(&ctx->cmsg); } if (ctx->raw_meta_i2e) { metadata_free(ctx->raw_meta_i2e); ctx->raw_meta_i2e = NULL; } if (ctx->raw_meta_e2i) { metadata_free(ctx->raw_meta_e2i); ctx->raw_meta_e2i = NULL; } if (ctx->ctrl_meta) { metadata_free(ctx->ctrl_meta); ctx->ctrl_meta = NULL; } if (ctx->c2s_info.header_data) { free(ctx->c2s_info.header_data); ctx->c2s_info.header_data = NULL; } if (ctx->s2c_info.header_data) { free(ctx->s2c_info.header_data); ctx->s2c_info.header_data = NULL; } free(ctx); ctx = NULL; } } static void session_value_free_cb(void *ctx) { struct session_ctx *s_ctx = (struct session_ctx *)ctx; session_ctx_free(s_ctx); } static int add_ether_header(void *raw_data, char *src_mac, char *dst_mac){ struct ethhdr *ether_hdr = (struct ethhdr*)raw_data; memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest)); memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source)); return 0; } static int add_ether_proto(void *raw_data, uint16_t proto){ struct ethhdr *ether_hdr = (struct ethhdr*)raw_data; ether_hdr->h_proto = htons(proto); // ETH_P_IP return 0; } static int overwrite_tcp_mss(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore, uint64_t session_id, void *logger) { int ret = 0; uint16_t size = 0; int server_side_mss_enable = 0; int server_side_mss_value = 0; int client_side_mss_enable = 0; int client_side_mss_value = 0; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DOWNSTREAM_TCP_MSS_ENABLE, (unsigned char *)&client_side_mss_enable, sizeof(client_side_mss_enable), &size); if (ret < 0) { TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch client side tcp mss from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret)); return -1; } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DOWNSTREAM_TCP_MSS_VALUE, (unsigned char *)&client_side_mss_value, sizeof(client_side_mss_value), &size); if (ret < 0) { TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch client side tcp mss value from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret)); return -1; } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_UPSTREAM_TCP_MSS_ENABLE, (unsigned char *)&server_side_mss_enable, sizeof(server_side_mss_enable), &size); if (ret < 0) { TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch server side tcp mss from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret)); return -1; } ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_UPSTREAM_TCP_MSS_VALUE, (unsigned char *)&server_side_mss_value, sizeof(server_side_mss_value), &size); if (ret < 0) { TFE_LOG_ERROR(logger, "%s: session %lu failed at fetch server side tcp mss value from cmsg: %s", LOG_TAG_PKTIO, session_id, strerror(-ret)); return -1; } if (client_side_mss_enable) { restore->client.mss = client_side_mss_value; } if (server_side_mss_enable) { restore->server.mss = server_side_mss_value; } return 0; } static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_info *restore_info) { int ret = 0; uint16_t length = 0; uint32_t seq; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (unsigned char *)&seq, sizeof(uint32_t), &length); if (ret == 0) { restore_info->client.seq = ntohl(seq); restore_info->server.ack = ntohl(seq); } uint32_t ack; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (unsigned char *)&ack, sizeof(uint32_t), &length); if (ret == 0) { restore_info->client.ack = ntohl(ack); restore_info->server.seq = ntohl(ack); } uint8_t ts_client; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (unsigned char *)&ts_client, sizeof(uint8_t), &length); if (ret == 0) { restore_info->client.timestamp_perm = !!ts_client; } uint8_t ts_server; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (unsigned char *)&ts_server, sizeof(uint8_t), &length); if (ret == 0) { restore_info->server.timestamp_perm = !!ts_server; } uint32_t ts_client_val; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT_VAL, (unsigned char *)&ts_client_val, sizeof(uint32_t), &length); if (ret == 0) { restore_info->client.ts_val = ntohl(ts_client_val); } uint32_t ts_server_val; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER_VAL, (unsigned char *)&ts_server_val, sizeof(uint32_t), &length); if (ret == 0) { restore_info->server.ts_val = ntohl(ts_server_val); } uint8_t wsacle_client; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (unsigned char *)&wsacle_client, sizeof(uint8_t), &length); if (ret == 0) { restore_info->client.wscale_perm = true; restore_info->client.wscale = wsacle_client; } uint8_t wsacle_server; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (unsigned char *)&wsacle_server, sizeof(uint8_t), &length); if (ret == 0) { restore_info->server.wscale_perm = true; restore_info->server.wscale = wsacle_server; } uint8_t sack_client; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (unsigned char *)&sack_client, sizeof(uint8_t), &length); if (ret == 0) { restore_info->client.sack_perm = !!sack_client; } uint8_t sack_server; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (unsigned char *)&sack_server, sizeof(uint8_t), &length); if (ret == 0) { restore_info->server.sack_perm = !!sack_server; } uint16_t mss_client; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (unsigned char *)&mss_client, sizeof(uint16_t), &length); if (ret == 0) { restore_info->client.mss = mss_client; } uint16_t mss_server; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (unsigned char *)&mss_server, sizeof(uint16_t), &length); if (ret == 0) { restore_info->server.mss = mss_server; } uint16_t window_client; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (unsigned char *)&window_client, sizeof(uint16_t), &length); if (ret == 0) { restore_info->client.window = window_client; } uint16_t window_server; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (unsigned char *)&window_server, sizeof(uint16_t), &length); if (ret == 0) { restore_info->server.window = window_server; } uint8_t packet_cur_dir; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_INFO_PACKET_CUR_DIR, (unsigned char *)&packet_cur_dir, sizeof(uint8_t), &length); if (ret == 0) { restore_info->cur_dir = (enum tcp_restore_pkt_dir)packet_cur_dir; } return 0; } static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_restore_info *restore_info) { if (tuple4->addr_type == ADDR_TUPLE4_TYPE_V4) { struct sockaddr_in *in_addr_client; struct sockaddr_in *in_addr_server; if (restore_info->cur_dir == PKT_DIR_NOT_SET || restore_info->cur_dir == PKT_DIR_C2S) { in_addr_client = (struct sockaddr_in *)&restore_info->client.addr; in_addr_server = (struct sockaddr_in *)&restore_info->server.addr; } else { in_addr_client = (struct sockaddr_in *)&restore_info->server.addr; in_addr_server = (struct sockaddr_in *)&restore_info->client.addr; } in_addr_client->sin_family = AF_INET; in_addr_client->sin_addr = tuple4->addr_v4.src_addr; in_addr_client->sin_port = tuple4->src_port; in_addr_server->sin_family = AF_INET; in_addr_server->sin_addr = tuple4->addr_v4.dst_addr; in_addr_server->sin_port = tuple4->dst_port; } if (tuple4->addr_type == ADDR_TUPLE4_TYPE_V6) { struct sockaddr_in6 *in6_addr_client; struct sockaddr_in6 *in6_addr_server; if (restore_info->cur_dir == PKT_DIR_NOT_SET || restore_info->cur_dir == PKT_DIR_C2S) { in6_addr_client = (struct sockaddr_in6 *)&restore_info->client.addr; in6_addr_server = (struct sockaddr_in6 *)&restore_info->server.addr; } else { in6_addr_client = (struct sockaddr_in6 *)&restore_info->server.addr; in6_addr_server = (struct sockaddr_in6 *)&restore_info->client.addr; } in6_addr_client->sin6_family = AF_INET6; in6_addr_client->sin6_addr = tuple4->addr_v6.src_addr; in6_addr_client->sin6_port = tuple4->src_port; in6_addr_server->sin6_family = AF_INET6; in6_addr_server->sin6_addr = tuple4->addr_v6.dst_addr; in6_addr_server->sin6_port = tuple4->dst_port; } return 0; } // return 0 : success // return -1 : error static int packet_io_config(const char *profile, struct config *config, void *logger) { int ret = 0; MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&config->bypass_all_traffic, 0); MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1); MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface)); MESA_load_profile_string_def(profile, "PACKET_IO", "tap_name", config->dev_tap, sizeof(config->dev_tap), "tap0"); MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_allow_mutilthread", &config->tap_allow_mutilthread); MESA_load_profile_string_nodef(profile, "PACKET_IO", "bpf_obj", config->bpf_obj, sizeof(config->bpf_obj)); MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_debug_log", (int *)&config->bpf_debug_log); MESA_load_profile_int_nodef(profile, "PACKET_IO", "bpf_hash_mode", (int *)&config->bpf_hash_mode); MESA_load_profile_int_nodef(profile, "PACKET_IO", "tap_rps_enable", &config->tap_rps_enable); MESA_load_profile_string_nodef(profile, "PACKET_IO", "tap_rps_mask", config->tap_rps_mask, sizeof(config->tap_rps_mask)); MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_iouring", &config->enable_iouring); MESA_load_profile_int_nodef(profile, "PACKET_IO", "enable_debuglog", &config->enable_debuglog); MESA_load_profile_int_nodef(profile, "PACKET_IO", "ring_size", &config->ring_size); MESA_load_profile_int_nodef(profile, "PACKET_IO", "buff_size", &config->buff_size); MESA_load_profile_int_nodef(profile, "PACKET_IO", "flags", &config->flags); MESA_load_profile_int_nodef(profile, "PACKET_IO", "sq_thread_idle", &config->sq_thread_idle); MESA_load_profile_string_def(profile, "traffic_steering", "device_client", config->dev_tap_c, sizeof(config->dev_tap_c), "tap_c"); MESA_load_profile_string_def(profile, "traffic_steering", "device_server", config->dev_tap_s, sizeof(config->dev_tap_s), "tap_s"); char src_mac_addr[18] = {0}; ret = MESA_load_profile_string_nodef(profile, "PACKET_IO", "src_mac_addr", src_mac_addr, sizeof(src_mac_addr)); if(ret < 0){ TFE_LOG_ERROR(logger, "%s: invalid src_mac_addr: src_mac_addr not set, profile = %s, section = PACKET_IO", LOG_TAG_PKTIO, profile); return -1; } str_to_mac(src_mac_addr, config->src_mac); ret = get_mac_by_device_name(config->dev_tap, config->tap_mac); if (ret != 0) { TFE_LOG_ERROR(logger, "%s: invalid tap_name: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap); return -1; } ret = get_mac_by_device_name(config->dev_tap_c, config->tap_c_mac); if (ret != 0) { TFE_LOG_ERROR(logger, "%s: invalid device_client: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_c); return -1; } ret = get_mac_by_device_name(config->dev_tap_s, config->tap_s_mac); if (ret != 0) { TFE_LOG_ERROR(logger, "%s: invalid device_server: unable get %s mac", LOG_TAG_PKTIO, config->dev_tap_s); return -1; } if (config->rx_burst_max > RX_BURST_MAX) { TFE_LOG_ERROR(logger, "%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX); return -1; } if (strlen(config->app_symbol) == 0) { TFE_LOG_ERROR(logger, "%s: invalid app_symbol in %s", LOG_TAG_PKTIO, profile); return -1; } if (strlen(config->dev_nf_interface) == 0) { TFE_LOG_ERROR(logger, "%s: invalid dev_nf_interface in %s", LOG_TAG_PKTIO, profile); return -1; } TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bypass_all_traffic : %d", LOG_TAG_PKTIO, config->bypass_all_traffic); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_name : %s", LOG_TAG_PKTIO, config->tap_rps_mask); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_allow_mutilthread : %d", LOG_TAG_PKTIO, config->tap_allow_mutilthread); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_obj : %s", LOG_TAG_PKTIO, config->bpf_obj); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_debug_log : %d", LOG_TAG_PKTIO, config->bpf_debug_log); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->bpf_hash_mode : %d", LOG_TAG_PKTIO, config->bpf_hash_mode); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_rps_enable : %d", LOG_TAG_PKTIO, config->tap_rps_enable); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->tap_rps_mask : %s", LOG_TAG_PKTIO, config->tap_rps_mask); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->enable_iouring : %d", LOG_TAG_PKTIO, config->enable_iouring); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->enable_debuglog : %d", LOG_TAG_PKTIO, config->enable_debuglog); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->ring_size : %d", LOG_TAG_PKTIO, config->ring_size); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->buff_size : %d", LOG_TAG_PKTIO, config->buff_size); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->flags : %d", LOG_TAG_PKTIO, config->flags); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->sq_thread_idle : %d", LOG_TAG_PKTIO, config->sq_thread_idle); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->device_client : %s", LOG_TAG_PKTIO, config->dev_tap_c); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->device_server : %s", LOG_TAG_PKTIO, config->dev_tap_s); TFE_LOG_DEBUG(logger, "%s: PACKET_IO->src_mac_addr : %s", LOG_TAG_PKTIO, src_mac_addr); return 0; } // return 0 : success // return -1 : error static int packet_io_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta, void *logger) { memset(meta, 0, sizeof(struct metadata)); if (marsio_buff_get_metadata(rx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) <= 0) { TFE_LOG_ERROR(logger, "%s: unable to get session_id from metadata", LOG_TAG_PKTIO); return -1; } meta->raw_len = marsio_buff_datalen(rx_buff); meta->raw_data = marsio_buff_mtod(rx_buff); if (meta->raw_data == NULL || meta->raw_len == 0) { TFE_LOG_ERROR(logger, "%s: unable to get raw_data from metadata", LOG_TAG_PKTIO); return -1; } // 1: E2I // 0: I2E if (marsio_buff_get_metadata(rx_buff, MR_BUFF_DIR, &(meta->is_e2i_dir), sizeof(meta->is_e2i_dir)) <= 0) { TFE_LOG_ERROR(logger, "%s: unable to get buff_dir from metadata", LOG_TAG_PKTIO); return -1; } if (marsio_buff_is_ctrlbuf(rx_buff)) { meta->is_ctrl_pkt = 1; // only control packet set MR_BUFF_PAYLOAD_OFFSET if (marsio_buff_get_metadata(rx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) <= 0) { TFE_LOG_ERROR(logger, "%s: unable to get l7offset from metadata", LOG_TAG_PKTIO); return -1; } } else { meta->is_ctrl_pkt = 0; uint16_t user_data = 0; if (marsio_buff_get_metadata(rx_buff, MR_BUFF_USER_0, &(user_data), sizeof(user_data)) <= 0) { TFE_LOG_ERROR(logger, "%s: unable to get is_decrypted from metadata", LOG_TAG_PKTIO); return -1; } if (user_data & TRAFFIC_IS_DECRYPTED) { meta->is_decrypted = 1; } else { meta->is_decrypted = 0; } } meta->route_ctx.len = marsio_buff_get_metadata(rx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, sizeof(meta->route_ctx.data)); if (meta->route_ctx.len <= 0) { TFE_LOG_ERROR(logger, "%s: unable to get route_ctx from metadata", LOG_TAG_PKTIO); return -1; } meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0])); if (meta->sids.num < 0) { TFE_LOG_ERROR(logger, "%s: unable to get sid_list from metadata", LOG_TAG_PKTIO); return -1; } return 0; } // return 0 : success // return -1 : error static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta, void *logger) { if (meta->session_id) { if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set session_id for metadata", LOG_TAG_PKTIO); return -1; } } if (meta->is_ctrl_pkt) { marsio_buff_set_ctrlbuf(tx_buff); if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set l7offset for metadata", LOG_TAG_PKTIO); return -1; } } else { uint16_t user_data = 0; if (meta->is_decrypted) { user_data = SET_TRAFFIC_IS_DECRYPTED(user_data); } if (marsio_buff_set_metadata(tx_buff, MR_BUFF_USER_0, &(user_data), sizeof(user_data)) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set is_decrypted for metadata", LOG_TAG_PKTIO); return -1; } } if (meta->route_ctx.len > 0) { if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, meta->route_ctx.len) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set route_ctx for metadata", LOG_TAG_PKTIO); return -1; } } if (meta->sids.num > 0) { if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set sid_list for metadata", LOG_TAG_PKTIO); return -1; } } return 0; } __attribute__((unused))static void packet_io_dump_metadata(struct metadata *meta, void *logger) { TFE_LOG_ERROR(logger, "%s: META={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}", LOG_TAG_PKTIO, meta->session_id, meta->raw_len, meta->is_e2i_dir, meta->is_ctrl_pkt, meta->l7offset, meta->is_decrypted, meta->sids.num); } /* { "tsync": "2.0", "session_id": "123456789", "state": "active", "method": "log_update", "params": { "proxy": { "ssl_intercept_info": { mpack array } } } } */ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; marsio_buff_t *tx_buffs[1]; struct metadata meta = {0}; void * logger = thread->logger; uint16_t length = 0; uint8_t ssl_intercept_status = 0; uint64_t ssl_upstream_latency = 0; uint64_t ssl_downstream_latency = 0; char ssl_upstream_version[64] = {0}; uint16_t ssl_upstream_version_length = 0; char ssl_downstream_version[64] = {0}; uint16_t ssl_downstream_version_length = 0; uint8_t ssl_cert_verify = 0; char ssl_error[64] = {0}; uint16_t ssl_error_length = 0; char ssl_passthrough_reason[32] = {0}; uint16_t ssl_passthrough_reason_length = 0; uint8_t ssl_pinning_state = 0; char *data = NULL; size_t size; mpack_writer_t writer; if (s_ctx->protocol != STREAM_PROTO_SSL) return; mpack_writer_init_growable(&writer, &data, &size); // root map mpack_build_map(&writer); mpack_write_cstr(&writer, "tsync"); mpack_write_cstr(&writer, "2.0"); mpack_write_cstr(&writer, "session_id"); mpack_write_u64(&writer, s_ctx->session_id); mpack_write_cstr(&writer, "state"); mpack_write_cstr(&writer, "active"); mpack_write_cstr(&writer, "method"); mpack_write_cstr(&writer, "log_update"); mpack_write_cstr(&writer, "params"); // params map mpack_build_map(&writer); mpack_write_cstr(&writer, "proxy"); // proxy map mpack_build_map(&writer); mpack_write_cstr(&writer, "ssl_intercept_info"); mpack_build_array(&writer); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length); tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length); mpack_write_u8(&writer, ssl_intercept_status); mpack_write_u64(&writer, ssl_upstream_latency); mpack_write_u64(&writer, ssl_downstream_latency); mpack_write_str(&writer, ssl_upstream_version, ssl_upstream_version_length); mpack_write_str(&writer, ssl_downstream_version, ssl_downstream_version_length); mpack_write_u8(&writer, ssl_pinning_state); mpack_write_u8(&writer, ssl_cert_verify); mpack_write_str(&writer, ssl_error, ssl_error_length); mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length); mpack_complete_array(&writer); // proxy map end mpack_complete_map(&writer); // params map end mpack_complete_map(&writer); // root map end mpack_complete_map(&writer); // finish writing if (mpack_writer_destroy(&writer) != mpack_ok) { assert(0); if (data) { free(data); data = NULL; } return; } char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data; int raw_packet_header_len = s_ctx->ctrl_meta->l7offset; marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_seq); char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size); memcpy(dst, raw_packet_header_data, raw_packet_header_len); memcpy(dst + raw_packet_header_len, data, size); meta.session_id = s_ctx->session_id; meta.l7offset = raw_packet_header_len; meta.is_ctrl_pkt = 1; meta.sids.num = 1; meta.sids.elems[0] = acceptor_ctx->firewall_sids; route_ctx_copy(&meta.route_ctx, &s_ctx->ctrl_meta->route_ctx); packet_io_set_metadata(tx_buffs[0], &meta, logger); ATOMIC_INC(&(packet_io_fs->session_log)); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1); if (data) free(data); return; } static void tcp_restore_info_dump(struct tcp_restore_info *info, uint64_t session_id, void *logger) { char str_client_addr[64] = { 0 }; char str_server_addr[64] = { 0 }; const struct tcp_restore_endpoint *client = &info->client; const struct tcp_restore_endpoint *server = &info->server; assert(client->addr.ss_family == server->addr.ss_family); if (client->addr.ss_family == AF_INET) { struct sockaddr_in *sk_client = (struct sockaddr_in *)&client->addr; struct sockaddr_in *sk_server = (struct sockaddr_in *)&server->addr; uint16_t port_client = ntohs(sk_client->sin_port); uint16_t port_server = ntohs(sk_server->sin_port); inet_ntop(AF_INET, &sk_client->sin_addr, str_client_addr, sizeof(str_client_addr)); inet_ntop(AF_INET, &sk_server->sin_addr, str_server_addr, sizeof(str_client_addr)); TFE_LOG_DEBUG(logger, "restore_info session %lu %s:%hu %s:%hu: cur_dir=%u\n" "\tclient={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }\n" "\tserver={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }", session_id, str_client_addr, port_client, str_server_addr, port_server, info->cur_dir, str_client_addr, port_client, client->seq, client->ack, client->ts_val, client->mss, client->window, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), str_server_addr, port_server, server->seq, server->ack, server->ts_val, server->mss, server->window, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); } else if (client->addr.ss_family == AF_INET6) { struct sockaddr_in6 *sk_client = (struct sockaddr_in6 *)&client->addr; struct sockaddr_in6 *sk_server = (struct sockaddr_in6 *)&server->addr; uint16_t port_client = ntohs(sk_client->sin6_port); uint16_t port_server = ntohs(sk_server->sin6_port); inet_ntop(AF_INET6, &sk_client->sin6_addr, str_client_addr, sizeof(str_client_addr)); inet_ntop(AF_INET6, &sk_server->sin6_addr, str_server_addr, sizeof(str_client_addr)); TFE_LOG_DEBUG(logger, "restore_info session %lu %s:%hu %s:%hu: tcp_restore_info %p cur_dir=%u\n" "\tclient={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }\n" "\tserver={ addr=%s, port=%hu, seq:%u, ack:%u, ts_val:%u, mss=%u, window:%hu, wscale_perm=%u, wscale=%u, timestamp_perm=%u, sack_perm=%u }", session_id, str_client_addr, port_client, str_server_addr, port_server, info, info->cur_dir, str_client_addr, port_client, client->seq, client->ack, client->ts_val, client->mss, client->window, (client->wscale_perm ? 1 : 0), client->wscale, (client->timestamp_perm ? 1 : 0), (client->sack_perm ? 1 : 0), str_server_addr, port_server, server->seq, server->ack, server->ts_val, server->mss, server->window, (server->wscale_perm ? 1 : 0), server->wscale, (server->timestamp_perm ? 1 : 0), (server->sack_perm ? 1 : 0)); } } static void set_passthrough_reason(struct tfe_cmsg *cmsg, char *reason) { uint8_t ssl_intercept_status = SSL_ACTION_PASSTHROUGH; tfe_cmsg_set(cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (const unsigned char *)reason, strlen(reason)); tfe_cmsg_set(cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (const unsigned char *)&ssl_intercept_status, (uint16_t)sizeof(ssl_intercept_status)); tfe_cmsg_set_flag(cmsg, TFE_CMSG_FLAG_USER0); } typedef int tcp_handshake_fn(struct tcp_restore_info *info, struct ether_addr *client_mac, struct ether_addr *server_mac, char *buffer, int size); static void packet_io_send_fake_pkt(struct packet_io_thread_ctx *thread, struct tcp_restore_info *info, uint64_t session_id, struct route_ctx *c2s_route_ctx, struct route_ctx *s2c_route_ctx) { struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct ether_addr *client_mac = (struct ether_addr *)&packet_io->config.tap_c_mac; struct ether_addr *server_mac = (struct ether_addr *)&packet_io->config.tap_s_mac; void *logger = thread->logger; char buffer[1500]; struct metadata meta = {0}; meta.session_id = session_id; meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0); meta.is_ctrl_pkt = 0; meta.l7offset = 0; meta.sids.num = 2; meta.sids.elems[0] = acceptor_ctx->sce_sids; meta.sids.elems[1] = acceptor_ctx->proxy_sids; static tcp_handshake_fn *fn[3] = {tfe_tcp_restore_syn_packet, tfe_tcp_restore_synack_packet, tfe_tcp_restore_ack_packet}; marsio_buff_t *tx_buffs[3]; marsio_buff_malloc_global(packet_io->instance, tx_buffs, 3, 0, thread->thread_index); for (int i = 0; i < 3; i++) { meta.raw_len = fn[i](info, client_mac, server_mac, buffer, sizeof(buffer)); meta.raw_data = marsio_buff_append(tx_buffs[i], meta.raw_len); memcpy(meta.raw_data, buffer, meta.raw_len); switch (i) { case 0: /* fail through */ case 2: route_ctx_copy(&meta.route_ctx, c2s_route_ctx); break; case 1: route_ctx_copy(&meta.route_ctx, s2c_route_ctx); break; } packet_io_set_metadata(tx_buffs[i], &meta, logger); throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, meta.raw_len); } marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 3, MARSIO_SEND_OPT_REHASH); } // return 0 : success // return -1 : error static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { int ret = 0; int fd_downstream = 0; int fd_upstream = 0; int fd_fake_c = 0; int fd_fake_s = 0; uint16_t size = 0; uint8_t is_passthrough = 0; uint8_t hit_no_intercept = 0; uint16_t out_size = 0; char stream_traceid[24] = {0}; char reason_invalid_intercept_param[] = "Invalid Intercept Param"; char reason_invalid_tcp_policy_param[] = "Invalid tcp policy Param"; char reason_underlying_stream_error[] = "Underlying Stream Error"; unsigned int stream_common_direction; uint8_t stream_protocol_in_char = 0; uint8_t enable_decrypted_traffic_steering = 0; struct ethhdr *ether_hdr = NULL; struct session_ctx *s_ctx = NULL; struct addr_tuple4 inner_tuple4; struct tcp_restore_info restore_info; memset(&inner_tuple4, 0, sizeof(inner_tuple4)); memset(&restore_info, 0, sizeof(restore_info)); struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; void * logger = thread->logger; struct raw_pkt_parser raw_parser; raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8); const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len, logger); if ((char *)payload - meta->raw_data != meta->l7offset) { uint16_t offset = (char *)payload - meta->raw_data; TFE_LOG_ERROR(logger, "%s: incorrect dataoffset in the control zone of session %lu, offset:%u, l7offset:%u, payload:%p, raw_data:%p", LOG_TAG_PKTIO, meta->session_id, offset, meta->l7offset, payload, meta->raw_data); } raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_tuple4, logger); tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, sizeof(stream_protocol_in_char), &size); ret = intercept_policy_enforce(thread->ref_proxy->int_ply_enforcer, parser->cmsg); if (ret != 0) { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param); goto passthrough; } if (parser->intercpet_data == 0) { tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); if (hit_no_intercept == 1) { is_passthrough = 1; goto passthrough; } ret = tcp_policy_enforce(thread->ref_proxy->tcp_ply_enforcer, parser->cmsg); if (ret != 0) { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } for (int i = 0; i < parser->sce_policy_id_num; i++) { chaining_policy_enforce(thread->ref_proxy->chain_ply_enforcer, parser->cmsg, parser->sce_policy_ids[i]); } tcp_restore_set_from_cmsg(parser->cmsg, &restore_info); tcp_restore_set_from_pkg(&inner_tuple4, &restore_info); if (overwrite_tcp_mss(parser->cmsg, &restore_info, meta->session_id, logger)) { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } tcp_restore_info_dump(&restore_info, meta->session_id, logger); // tcp repair C2S fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), packet_io->config.dev_tap, 0x65); if (fd_upstream < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(UPSTREAM)", LOG_TAG_PKTIO, meta->session_id); is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } // tcp repair S2C fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), packet_io->config.dev_tap, 0x65); if (fd_downstream < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(DOWNSTREAM)", LOG_TAG_PKTIO, meta->session_id); is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_DECRYPTED_TRAFFIC_STEERING, (unsigned char *)&enable_decrypted_traffic_steering, sizeof(enable_decrypted_traffic_steering), &size); if ((STREAM_PROTO_PLAIN == (enum tfe_stream_proto)stream_protocol_in_char && thread->ref_proxy->traffic_steering_options.enable_steering_http) || (STREAM_PROTO_SSL == (enum tfe_stream_proto)stream_protocol_in_char && thread->ref_proxy->traffic_steering_options.enable_steering_ssl) || enable_decrypted_traffic_steering == 1) { packet_io_send_fake_pkt(thread, &restore_info, meta->session_id, &parser->seq_route_ctx, &parser->ack_route_ctx); fd_fake_c = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), thread->ref_proxy->traffic_steering_options.device_client, thread->ref_proxy->traffic_steering_options.so_mask_client); if (fd_fake_c < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_c)", LOG_TAG_PKTIO, meta->session_id); is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } fd_fake_s = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), thread->ref_proxy->traffic_steering_options.device_server, thread->ref_proxy->traffic_steering_options.so_mask_server); if (fd_fake_s < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_s)", LOG_TAG_PKTIO, meta->session_id); is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } } stream_common_direction = meta->is_e2i_dir ? 'I' : 'E'; tfe_cmsg_set(parser->cmsg, TFE_CMSG_COMMON_DIRECTION, (const unsigned char *)&stream_common_direction, sizeof(stream_common_direction)); snprintf(stream_traceid, 24, "%" PRIu64, meta->session_id); tfe_cmsg_set(parser->cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char *)stream_traceid, strlen(stream_traceid)); tfe_cmsg_dup(parser->cmsg); // 为避免 packet IO thread 与 worker 访问 cmsg 时出现竞争,packet IO thread 必须在调用 tfe_proxy_fds_accept 之前 set cmsg if (tfe_proxy_fds_accept(thread->ref_proxy, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s, parser->cmsg) < 0) { TFE_LOG_ERROR(logger, "%s: session %lu Failed at tfe_proxy_fds_accept()", LOG_TAG_PKTIO, meta->session_id); is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); goto passthrough; } } else if (parser->intercpet_data & (IS_SINGLE | IS_TUNNEL)) { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_underlying_stream_error); } passthrough: s_ctx = session_ctx_new(); s_ctx->raw_meta_i2e = metadata_new(); s_ctx->raw_meta_e2i = metadata_new(); s_ctx->ctrl_meta = metadata_new(); s_ctx->protocol = stream_protocol_in_char; s_ctx->ref_thread_ctx = thread; s_ctx->session_id = meta->session_id; s_ctx->session_addr = addr_tuple4_to_str(&inner_tuple4); s_ctx->cmsg = parser->cmsg; s_ctx->is_passthrough = is_passthrough; metadata_deep_copy(s_ctx->ctrl_meta, meta); ether_hdr = (struct ethhdr *)(s_ctx->ctrl_meta->raw_data); memcpy(s_ctx->client_mac, ether_hdr->h_source, 6); memcpy(s_ctx->server_mac, ether_hdr->h_dest, 6); // c2s s_ctx->c2s_info.is_e2i_dir = meta->is_e2i_dir; s_ctx->c2s_info.tuple4 = inner_tuple4; // s2c s_ctx->s2c_info.is_e2i_dir = !meta->is_e2i_dir; addr_tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4); s_ctx->policy_ids = parser->tfe_policy_ids[0]; sids_copy(&s_ctx->ctrl_meta->sids, &meta->sids); route_ctx_copy(&s_ctx->ctrl_meta->route_ctx, &meta->route_ctx); if (parser->seq_len > 0) raw_traffic_decapsulate(&raw_parser, parser->seq_header, parser->seq_len, &s_ctx->c2s_info.header_data, &s_ctx->c2s_info.header_len, &s_ctx->c2s_info.is_ipv4); if (parser->ack_len > 0) raw_traffic_decapsulate(&raw_parser, parser->ack_header, parser->ack_len, &s_ctx->s2c_info.header_data, &s_ctx->s2c_info.header_len, &s_ctx->s2c_info.is_ipv4); if (s_ctx->c2s_info.is_e2i_dir) { sids_copy(&s_ctx->raw_meta_e2i->sids, &parser->seq_sids); route_ctx_copy(&s_ctx->raw_meta_e2i->route_ctx, &parser->seq_route_ctx); sids_copy(&s_ctx->raw_meta_i2e->sids, &parser->ack_sids); route_ctx_copy(&s_ctx->raw_meta_i2e->route_ctx, &parser->ack_route_ctx); } else { sids_copy(&s_ctx->raw_meta_i2e->sids, &parser->seq_sids); route_ctx_copy(&s_ctx->raw_meta_i2e->route_ctx, &parser->seq_route_ctx); sids_copy(&s_ctx->raw_meta_e2i->sids, &parser->ack_sids); route_ctx_copy(&s_ctx->raw_meta_e2i->route_ctx, &parser->ack_route_ctx); } TFE_LOG_INFO(logger, "%s: session %lu %s active first", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr); session_table_insert(thread->session_table, s_ctx->session_id, &(s_ctx->c2s_info.tuple4), s_ctx, session_value_free_cb); ATOMIC_INC(&(packet_io_fs->session_num)); if (parser->seq_header) FREE(&parser->seq_header); if (parser->ack_header) FREE(&parser->ack_header); return 0; } // return 0 : success // return -1 : error static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (!node) { return handle_session_opening(meta, parser, thread_seq, ctx); } return 0; } // return 0 : success // return -1 : error static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; void * logger = thread->logger; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (node) { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq, s_ctx->c2s_info.is_e2i_dir); session_table_delete_by_id(thread->session_table, meta->session_id); ATOMIC_DEC(&(packet_io_fs->session_num)); return 0; } return -1; } // return 0 : success // return -1 : error static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; void * logger = thread->logger; TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); ATOMIC_ZERO(&(packet_io_fs->session_num)); for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; __atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED); } return 0; } // return 0 : success // return -1 : error static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; void * logger = thread->logger; struct metadata meta; if (packet_io_get_metadata(rx_buff, &meta, logger) == -1) { TFE_LOG_ERROR(logger, "%s: unexpected control packet, unable to get metadata\n\tMETA={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}", LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num); return -1; } struct ctrl_pkt_parser ctrl_parser; ctrl_packet_parser_init(&ctrl_parser); if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7offset, meta.raw_len - meta.l7offset, logger) == -1) { TFE_LOG_ERROR(logger, "%s: unexpected control packet, metadata's session %lu unable to parse data", LOG_TAG_PKTIO, meta.session_id); return -1; } if (ctrl_parser.session_id != meta.session_id) { TFE_LOG_ERROR(logger, "%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id); ctrl_packet_cmsg_destroy(&ctrl_parser); return -1; } switch (ctrl_parser.state) { case SESSION_STATE_OPENING: __atomic_fetch_add(&packet_io_fs->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED); // when session opening, firewall not send policy id // return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx); break; case SESSION_STATE_CLOSING: __atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED); return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_ACTIVE: __atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED); return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_RESETALL: __atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED); return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx); default: __atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED); break; } return 0; } static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; int is_ipv4 = 0; uint8_t flag = 0; char *header = NULL; int header_len = 0; void * logger = thread->logger; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); struct metadata meta; if (packet_io_get_metadata(rx_buff, &meta, logger) == -1) { TFE_LOG_ERROR(logger, "%s: unexpected control packet, unable to get metadata\n\tMETA={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}", LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id); if (node == NULL) { throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); if (thread->ref_acceptor_ctx->debug) { struct addr_tuple4 inner_addr; struct raw_pkt_parser raw_parser; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len, logger); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger); uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser); char *str = addr_tuple4_to_str(&inner_addr); TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, str, ipid); free(str); } marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; if (s_ctx->is_passthrough > 0) { throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len); else throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { send_event_log(s_ctx, thread_seq, ctx); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); } marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return 0; } if (meta.is_decrypted) { throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len); // c2s if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) { add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac); throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len); if (packet_io->config.enable_iouring) { io_uring_write(thread->tap_ctx->io_uring_s, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_s, raw_data, raw_len, logger); } } // s2c else { add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac); throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len); if (packet_io->config.enable_iouring) { io_uring_write(thread->tap_ctx->io_uring_c, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_c, raw_data, raw_len, logger); } } } else { throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); if (meta.is_e2i_dir) { if (metadata_is_empty(s_ctx->raw_meta_e2i)) { metadata_deep_copy(s_ctx->raw_meta_e2i, &meta); } s_ctx->raw_meta_e2i->sids = meta.sids; } else { if (metadata_is_empty(s_ctx->raw_meta_i2e)) { metadata_deep_copy(s_ctx->raw_meta_i2e, &meta); } s_ctx->raw_meta_i2e->sids = meta.sids; } if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) { header = s_ctx->c2s_info.header_data; header_len = s_ctx->c2s_info.header_len; is_ipv4 = s_ctx->c2s_info.is_ipv4; throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len); } else { header = s_ctx->s2c_info.header_data; header_len = s_ctx->s2c_info.header_len; is_ipv4 = s_ctx->s2c_info.is_ipv4; throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); } if (header != NULL) { char *packet_buff = NULL; int packet_len = sizeof(struct ethhdr) + raw_len - header_len; packet_buff = (char *)calloc(packet_len, sizeof(char)); memcpy(packet_buff + sizeof(struct ethhdr), raw_data + header_len, raw_len - header_len); add_ether_header(packet_buff, packet_io->config.src_mac, packet_io->config.tap_mac); if (is_ipv4) add_ether_proto(packet_buff, ETH_P_IP); else add_ether_proto(packet_buff, ETH_P_IPV6); if (packet_io->config.enable_iouring) { io_uring_write(thread->tap_ctx->io_uring_fd, packet_buff, packet_len); } else { tap_write(thread->tap_ctx->tap_fd, packet_buff, packet_len, logger); } throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, packet_len); if (packet_buff) free(packet_buff); } else { // send to tap0 add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac); if (packet_io->config.enable_iouring) { io_uring_write(thread->tap_ctx->io_uring_fd, raw_data, raw_len); } else { tap_write(thread->tap_ctx->tap_fd, raw_data, raw_len, logger); } throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, raw_len); } flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { send_event_log(s_ctx, thread_seq, ctx); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); } } marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); return 0; } /****************************************************************************** * EXTERN ******************************************************************************/ int is_enable_iouring(struct packet_io *handle) { return handle->config.enable_iouring; } void tfe_tap_ctx_destory(struct tap_ctx *handler) { if (handler) { io_uring_instance_destory(handler->io_uring_fd); io_uring_instance_destory(handler->io_uring_c); io_uring_instance_destory(handler->io_uring_s); if (handler->eventfd > 0) close(handler->eventfd); if (handler->eventfd_c > 0) close(handler->eventfd_c); if (handler->eventfd_s > 0) close(handler->eventfd_s); tap_close(handler->tap_fd); tap_close(handler->tap_c); tap_close(handler->tap_s); free(handler); handler = NULL; } } struct tap_ctx *tfe_tap_ctx_create(void *ctx) { int ret = 0; struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread_ctx->ref_io; struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx)); assert(tap_ctx != NULL); tap_ctx->tap_fd = tap_open(packet_io->config.dev_tap, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); tap_ctx->tap_c = tap_open(packet_io->config.dev_tap_c, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); tap_ctx->tap_s = tap_open(packet_io->config.dev_tap_s, IFF_TAP | IFF_NO_PI | IFF_MULTI_QUEUE); tap_up_link(packet_io->config.dev_tap); tap_up_link(packet_io->config.dev_tap_c); tap_up_link(packet_io->config.dev_tap_s); // fcntl(2) EFD_NONBLOCK 不生效 tap_ctx->eventfd = eventfd(0, EFD_NONBLOCK); tap_ctx->eventfd_c = eventfd(0, EFD_NONBLOCK); tap_ctx->eventfd_s = eventfd(0, EFD_NONBLOCK); if (packet_io->config.enable_iouring) { bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_fd); bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_c); bpf_obj_attach(packet_io->config.tap_bpf_ctx, tap_ctx->tap_s); tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, tap_ctx->eventfd, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); if (tap_ctx->io_uring_fd == NULL) goto error_out; tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, tap_ctx->eventfd_c, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); if (tap_ctx->io_uring_c == NULL) goto error_out; tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, tap_ctx->eventfd_s, packet_io->config.ring_size, packet_io->config.buff_size, packet_io->config.flags, packet_io->config.sq_thread_idle, packet_io->config.enable_debuglog); if (tap_ctx->io_uring_s == NULL) goto error_out; marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd, thread_ctx->thread_index); marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_c, thread_ctx->thread_index); marsio_poll_register_eventfd(packet_io->instance, tap_ctx->eventfd_s, thread_ctx->thread_index); } if (packet_io->config.tap_rps_enable) { ret = tap_set_rps(packet_io->config.dev_tap, thread_ctx->thread_index, packet_io->config.tap_rps_mask); if (ret != 0) goto error_out; ret = tap_set_rps(packet_io->config.dev_tap_c, thread_ctx->thread_index, packet_io->config.tap_rps_mask); if (ret != 0) goto error_out; ret = tap_set_rps(packet_io->config.dev_tap_s, thread_ctx->thread_index, packet_io->config.tap_rps_mask); if (ret != 0) goto error_out; } return tap_ctx; error_out: tfe_tap_ctx_destory(tap_ctx); return NULL; } int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, void *logger) { if (marsio_thread_init(handle->instance) != 0) { TFE_LOG_ERROR(logger, "%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index); return -1; } return 0; } void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms) { struct mr_vdev *vdevs[] = {handle->dev_nf_interface.mr_dev}; marsio_poll_wait(handle->instance, vdevs, 1, thread_ctx->thread_index, timeout_ms); } void packet_io_destory(struct packet_io *handle) { if (handle) { if (handle->dev_nf_interface.mr_path) { marsio_sendpath_destory(handle->dev_nf_interface.mr_path); handle->dev_nf_interface.mr_path = NULL; } if (handle->dev_nf_interface.mr_dev) { marsio_close_device(handle->dev_nf_interface.mr_dev); handle->dev_nf_interface.mr_dev = NULL; } if (handle->instance) { marsio_destory(handle->instance); handle->instance = NULL; } if (handle->config.tap_bpf_ctx) { bpf_obj_unload(handle->config.tap_bpf_ctx); handle->config.tap_bpf_ctx = NULL; } free(handle); handle = NULL; } } struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask, void *logger) { int opt = 1; struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io)); assert(handle != NULL); handle->thread_num = thread_num; if (packet_io_config(profile, &(handle->config), logger) != 0) { goto error_out; } if (handle->config.tap_allow_mutilthread) { if (handle->config.bpf_hash_mode != 2 && handle->config.bpf_hash_mode != 4) { TFE_LOG_ERROR(logger, "%s: bpf_hash_mode[%d] invalid.", LOG_TAG_PKTIO, handle->config.bpf_hash_mode); goto error_out; } handle->config.tap_bpf_ctx = bpf_obj_load(handle->config.bpf_obj, thread_num, handle->config.bpf_hash_mode, handle->config.bpf_debug_log); if (handle->config.tap_bpf_ctx == NULL) { TFE_LOG_ERROR(logger, "%s: under mutilthread mode, Unable to load bpf object.", LOG_TAG_PKTIO); goto error_out; } } else if (thread_num > 1){ TFE_LOG_ERROR(logger, "%s: under tap mode, when disable tap_allow_mutilthread, only support one work thread.", LOG_TAG_PKTIO); goto error_out; } handle->instance = marsio_create(); if (handle->instance == NULL) { TFE_LOG_ERROR(logger, "%s: unable to create marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) { TFE_LOG_ERROR(logger, "%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_init(handle->instance, handle->config.app_symbol) != 0) { TFE_LOG_ERROR(logger, "%s: unable to initialize marsio instance", LOG_TAG_PKTIO); goto error_out; } // Netwrok Function Interface handle->dev_nf_interface.mr_dev = marsio_open_device(handle->instance, handle->config.dev_nf_interface, handle->thread_num, handle->thread_num); if (handle->dev_nf_interface.mr_dev == NULL) { TFE_LOG_ERROR(logger, "%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface); goto error_out; } handle->dev_nf_interface.mr_path = marsio_sendpath_create_by_vdev(handle->dev_nf_interface.mr_dev); if (handle->dev_nf_interface.mr_path == NULL) { TFE_LOG_ERROR(logger, "%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface); goto error_out; } return handle; error_out: packet_io_destory(handle); return NULL; } // return n_packet_recv int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; marsio_buff_t *rx_buffs[RX_BURST_MAX]; // nr_recv <= rx_burst_max <= RX_BURST_MAX int nr_recv = marsio_recv_burst(handle->dev_nf_interface.mr_dev, thread_seq, rx_buffs, handle->config.rx_burst_max); if (nr_recv <= 0) { return 0; } if (handle->config.bypass_all_traffic == 1) { for (int i = 0; i < nr_recv; i++) { int raw_len = marsio_buff_datalen(rx_buffs[i]); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); } marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, rx_buffs, nr_recv); return nr_recv; } for (int j = 0; j < nr_recv; j++) { marsio_buff_t *rx_buff = rx_buffs[j]; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); if (is_downstream_keepalive_packet(rx_buff)) { throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->keepalived_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->keepalived_pkt_tx, 1, raw_len); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); continue; } if (thread->ref_acceptor_ctx->debug) { struct addr_tuple4 inner_addr; struct raw_pkt_parser raw_parser; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len, thread->logger); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, thread->logger); uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser); char *str = addr_tuple4_to_str(&inner_addr); TFE_LOG_DEBUG(thread->logger, "recv packet %s (ipid: %u)", str, ipid); free(str); } if (marsio_buff_is_ctrlbuf(rx_buff)) { throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->ctrl_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->ctrl_pkt_tx, 1, raw_len); // all control packet need bypass handle_control_packet(handle, rx_buff, thread_seq, ctx); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); } else { handle_raw_packet_from_nf(handle, rx_buff, thread_seq, ctx); } } return nr_recv; } void handle_decryption_packet_from_tap(const char *data, int len, void *args) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct addr_tuple4 inner_addr; struct raw_pkt_parser raw_parser; void * logger = thread->logger; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)data, len, logger); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger); throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, len); struct session_node *node = session_table_search_by_addr(thread->session_table, &inner_addr); if (node == NULL) { if (thread->ref_acceptor_ctx->debug) { char *str = addr_tuple4_to_str(&inner_addr); uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser); TFE_LOG_ERROR(logger, "decypted packet from tap %s (ipid: %u) miss session table", str, ipid); free(str); } throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len); return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d", alloc_ret, thread->thread_index); throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len); return; } char *dst = marsio_buff_append(tx_buffs[0], len); memcpy(dst, data, len); struct metadata meta = {0}; meta.session_id = s_ctx->session_id; meta.raw_data = dst; meta.raw_len = len; meta.is_decrypted = SET_TRAFFIC_IS_DECRYPTED(0); meta.is_ctrl_pkt = 0; meta.l7offset = 0; meta.sids.num = 2; meta.sids.elems[0] = acceptor_ctx->sce_sids; meta.sids.elems[1] = acceptor_ctx->proxy_sids; if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0) { meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir; throughput_metrics_inc(&packet_io_fs->tap_c_pkt_rx, 1, len); } else { meta.is_e2i_dir = s_ctx->s2c_info.is_e2i_dir; throughput_metrics_inc(&packet_io_fs->tap_s_pkt_rx, 1, len); } if (meta.is_e2i_dir) { route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_e2i->route_ctx); } else { route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx); } packet_io_set_metadata(tx_buffs[0], &meta, logger); throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, len); marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH); } void handle_raw_packet_from_tap(const char *data, int len, void *args) { char *src_mac = NULL; char *dst_mac = NULL; struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct addr_tuple4 inner_addr; struct raw_pkt_parser raw_parser; struct metadata meta = {0}; void * logger = thread->logger; char *dst = NULL; char *header = NULL; int header_len = 0; int packet_len = 0; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)data, len, logger); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger); throughput_metrics_inc(&packet_io_fs->tap_pkt_rx, 1, len); struct session_node *node = session_table_search_by_addr(thread->session_table, &inner_addr); if (node == NULL) { throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len); if (thread->ref_acceptor_ctx->debug) { char *str = addr_tuple4_to_str(&inner_addr); uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser); TFE_LOG_ERROR(logger, "raw packet from tap %s (ipid: %u) miss session table", str, ipid); free(str); } return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ TFE_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d", alloc_ret, thread->thread_index); throughput_metrics_inc(&packet_io_fs->tap_pkt_rxdrop, 1, len); return; } if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0) { meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir; src_mac = s_ctx->client_mac; dst_mac = s_ctx->server_mac; header = s_ctx->c2s_info.header_data; header_len = s_ctx->c2s_info.header_len; } else { meta.is_e2i_dir = s_ctx->s2c_info.is_e2i_dir; src_mac = s_ctx->server_mac; dst_mac = s_ctx->client_mac; header = s_ctx->s2c_info.header_data; header_len = s_ctx->s2c_info.header_len; } if (meta.is_e2i_dir) { sids_copy(&meta.sids, &s_ctx->raw_meta_e2i->sids); route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_e2i->route_ctx); } else { sids_copy(&meta.sids, &s_ctx->raw_meta_i2e->sids); route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx); } if (header != NULL) { packet_len = len + header_len - sizeof(struct ethhdr); dst = marsio_buff_append(tx_buffs[0], packet_len); memcpy(dst, header, header_len); memcpy(dst + header_len, data + sizeof(struct ethhdr), len - sizeof(struct ethhdr)); } else { packet_len = len; dst = marsio_buff_append(tx_buffs[0], len); memcpy(dst, data, len); } meta.session_id = s_ctx->session_id; meta.raw_data = dst; meta.raw_len = packet_len; meta.is_decrypted = 0; meta.is_ctrl_pkt = 0; meta.l7offset = 0; packet_io_set_metadata(tx_buffs[0], &meta, logger); add_ether_header(dst, src_mac, dst_mac); throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, packet_len); marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH); }