TSG-18285 TFE的Packet IO模块支持重复流量识别

This commit is contained in:
luwenpeng
2023-12-29 17:25:18 +08:00
parent 9d3dcce1ab
commit cbd98507a2
30 changed files with 4064 additions and 1572 deletions

View File

@@ -1,7 +1,6 @@
#include <assert.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/tcp.h>
#include <netinet/ether.h>
#include <linux/if_tun.h>
#include <sys/eventfd.h>
@@ -18,7 +17,9 @@
#include <time.h>
#include "tfe_ctrl_packet.h"
#include "tfe_raw_packet.h"
#include "packet.h"
#include "ipv4_helpers.h"
#include "tcp_helpers.h"
#include "io_uring.h"
#include "tfe_packet_io_fs.h"
#include "tfe_cmsg.h"
@@ -31,7 +32,8 @@
#include "tfe_session_table.h"
#include "tfe_packet_io.h"
#include "tfe_fieldstat.h"
#include "dablooms.h"
#include "timestamp.h"
/******************************************************************************
* Struct
@@ -134,10 +136,87 @@ struct metadata
struct route_ctx route_ctx;
};
struct packet_identify
{
// TCP
uint32_t tcp_seq;
uint32_t tcp_ack;
uint16_t sport;
uint16_t dport;
uint16_t tcp_checksum;
// IPv4
uint16_t ip_id;
uint32_t ip_src;
uint32_t ip_dst;
} __attribute__((__packed__));
extern int tcp_policy_enforce(struct tcp_policy_enforcer *tcp_enforcer, struct tfe_cmsg *cmsg);
extern int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg * cmsg);
extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, struct tfe_cmsg *cmsg, uint64_t rule_id);
/******************************************************************************
* dup packet filter
******************************************************************************/
// return 0: success
// reutrn -1: error
static int get_packet_identify(struct packet *packet, struct packet_identify *key)
{
const struct layer_record *l3_layer_record = packet_get_innermost_layer(packet, LAYER_TYPE_IPV4);
if (l3_layer_record == NULL)
{
return -1;
}
const struct layer_record *l4_layer_record = packet_get_innermost_layer(packet, LAYER_TYPE_TCP);
if (l4_layer_record == NULL)
{
return -1;
}
const struct ip *iphdr = (const struct ip *)l3_layer_record->hdr_ptr;
const struct tcphdr *tcphdr = (const struct tcphdr *)l4_layer_record->hdr_ptr;
memset(key, 0, sizeof(struct packet_identify));
key->tcp_seq = tcp_hdr_get_seq(tcphdr);
key->tcp_ack = tcp_hdr_get_ack(tcphdr);
key->sport = tcp_hdr_get_sport(tcphdr);
key->dport = tcp_hdr_get_dport(tcphdr);
key->tcp_checksum = tcp_hdr_get_checksum(tcphdr);
key->ip_id = ipv4_hdr_get_ipid(iphdr);
key->ip_src = ipv4_hdr_get_src(iphdr);
key->ip_dst = ipv4_hdr_get_dst(iphdr);
return 0;
}
static void add_packet_to_dablooms(struct packet *packet, struct expiry_dablooms_handle *handle)
{
struct packet_identify identify;
if (get_packet_identify(packet, &identify) == -1)
{
return;
}
expiry_dablooms_add(handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec());
}
// return 1: hit
// reutrn 0: no hit
static int search_packet_from_dablooms(struct packet *packet, struct expiry_dablooms_handle *handle)
{
struct packet_identify identify;
if (get_packet_identify(packet, &identify) == -1)
{
return 0;
}
if (expiry_dablooms_search(handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec()) == 1)
{
return 1;
}
return 0;
}
/******************************************************************************
* STATIC
******************************************************************************/
@@ -224,12 +303,6 @@ static void session_ctx_free(struct session_ctx *ctx)
{
if (ctx)
{
if (ctx->session_addr)
{
free(ctx->session_addr);
ctx->session_addr = NULL;
}
if (ctx->cmsg)
{
tfe_cmsg_destroy(&ctx->cmsg);
@@ -449,9 +522,9 @@ static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_i
return 0;
}
static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_restore_info *restore_info)
static int tcp_restore_set_from_pkg(struct tuple4 *tuple4, struct tcp_restore_info *restore_info)
{
if (tuple4->addr_type == ADDR_TUPLE4_TYPE_V4)
if (tuple4->ip_type == IP_TYPE_V4)
{
struct sockaddr_in *in_addr_client;
struct sockaddr_in *in_addr_server;
@@ -468,15 +541,15 @@ static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_resto
}
in_addr_client->sin_family = AF_INET;
in_addr_client->sin_addr = tuple4->addr_v4.src_addr;
in_addr_client->sin_addr = tuple4->src_addr.v4;
in_addr_client->sin_port = tuple4->src_port;
in_addr_server->sin_family = AF_INET;
in_addr_server->sin_addr = tuple4->addr_v4.dst_addr;
in_addr_server->sin_addr = tuple4->dst_addr.v4;
in_addr_server->sin_port = tuple4->dst_port;
}
if (tuple4->addr_type == ADDR_TUPLE4_TYPE_V6)
if (tuple4->ip_type == IP_TYPE_V6)
{
struct sockaddr_in6 *in6_addr_client;
struct sockaddr_in6 *in6_addr_server;
@@ -493,11 +566,11 @@ static int tcp_restore_set_from_pkg(struct addr_tuple4 *tuple4, struct tcp_resto
}
in6_addr_client->sin6_family = AF_INET6;
in6_addr_client->sin6_addr = tuple4->addr_v6.src_addr;
in6_addr_client->sin6_addr = tuple4->src_addr.v6;
in6_addr_client->sin6_port = tuple4->src_port;
in6_addr_server->sin6_family = AF_INET6;
in6_addr_server->sin6_addr = tuple4->addr_v6.dst_addr;
in6_addr_server->sin6_addr = tuple4->dst_addr.v6;
in6_addr_server->sin6_port = tuple4->dst_port;
}
@@ -997,6 +1070,34 @@ static void packet_io_send_fake_pkt(struct packet_io_thread_ctx *thread, struct
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 3, MARSIO_SEND_OPT_REHASH);
}
int raw_traffic_decapsulate(struct packet *handler, const char *raw_data, int raw_len, char **header, int *header_len, int *is_ipv4)
{
const struct layer_record *l2_tun_layer_record = NULL;
const struct layer_record *l3_layer_record = NULL;
const struct layer_record *l4_layer_record = NULL;
l4_layer_record = packet_get_innermost_layer(handler, LAYER_TYPE_L4);
if (l4_layer_record == NULL)
return -1;
if (l4_layer_record->type != LAYER_TYPE_TCP)
return -1;
l3_layer_record = packet_get_innermost_layer(handler, LAYER_TYPE_L3);
if (l3_layer_record == NULL)
return -1;
*is_ipv4 = l3_layer_record->type == LAYER_TYPE_IPV4 ? 1 : 0;
l2_tun_layer_record = packet_get_innermost_layer(handler, LAYER_TYPE_L2_TUN);
if (l2_tun_layer_record == NULL)
return -1;
*header_len = l3_layer_record->hdr_offset;
*header = (char *)calloc(*header_len, sizeof(char));
memcpy(*header, raw_data, *header_len);
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
@@ -1021,7 +1122,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
uint8_t enable_decrypted_traffic_steering = 0;
struct ethhdr *ether_hdr = NULL;
struct session_ctx *s_ctx = NULL;
struct addr_tuple4 inner_tuple4;
struct tuple4 inner_tuple4;
struct tcp_restore_info restore_info;
memset(&inner_tuple4, 0, sizeof(inner_tuple4));
memset(&restore_info, 0, sizeof(restore_info));
@@ -1031,15 +1132,14 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
void * logger = thread->logger;
struct raw_pkt_parser raw_parser;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len, logger);
struct packet pkt;
const void *payload = packet_parse(&pkt, (const char *)meta->raw_data, meta->raw_len);
if ((char *)payload - meta->raw_data != meta->l7offset)
{
uint16_t offset = (char *)payload - meta->raw_data;
TFE_LOG_ERROR(logger, "%s: incorrect dataoffset in the control zone of session %lu, offset:%u, l7offset:%u, payload:%p, raw_data:%p", LOG_TAG_PKTIO, meta->session_id, offset, meta->l7offset, payload, meta->raw_data);
}
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_tuple4, logger);
packet_get_innermost_tuple4(&pkt, &inner_tuple4);
tfe_cmsg_get_value(parser->cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, sizeof(stream_protocol_in_char), &size);
uint64_t rule_id = 0;
@@ -1161,7 +1261,7 @@ passthrough:
s_ctx->protocol = stream_protocol_in_char;
s_ctx->ref_thread_ctx = thread;
s_ctx->session_id = meta->session_id;
s_ctx->session_addr = addr_tuple4_to_str(&inner_tuple4);
tuple4_tostring(&inner_tuple4, s_ctx->session_addr, sizeof(s_ctx->session_addr));
s_ctx->cmsg = parser->cmsg;
s_ctx->is_passthrough = is_passthrough;
metadata_deep_copy(s_ctx->ctrl_meta, meta);
@@ -1176,7 +1276,7 @@ passthrough:
// s2c
s_ctx->s2c_info.is_e2i_dir = !meta->is_e2i_dir;
addr_tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4);
tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4);
s_ctx->policy_ids = parser->tfe_policy_ids[0];
@@ -1184,9 +1284,9 @@ passthrough:
route_ctx_copy(&s_ctx->ctrl_meta->route_ctx, &meta->route_ctx);
if (parser->seq_len > 0)
raw_traffic_decapsulate(&raw_parser, parser->seq_header, parser->seq_len, &s_ctx->c2s_info.header_data, &s_ctx->c2s_info.header_len, &s_ctx->c2s_info.is_ipv4);
raw_traffic_decapsulate(&pkt, parser->seq_header, parser->seq_len, &s_ctx->c2s_info.header_data, &s_ctx->c2s_info.header_len, &s_ctx->c2s_info.is_ipv4);
if (parser->ack_len > 0)
raw_traffic_decapsulate(&raw_parser, parser->ack_header, parser->ack_len, &s_ctx->s2c_info.header_data, &s_ctx->s2c_info.header_len, &s_ctx->s2c_info.is_ipv4);
raw_traffic_decapsulate(&pkt, parser->ack_header, parser->ack_len, &s_ctx->s2c_info.header_data, &s_ctx->s2c_info.header_len, &s_ctx->s2c_info.is_ipv4);
if (s_ctx->c2s_info.is_e2i_dir) {
sids_copy(&s_ctx->raw_meta_e2i->sids, &parser->seq_sids);
@@ -1350,6 +1450,20 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
return -1;
}
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)
{
struct packet packet;
packet_parse(&packet, (const char *)raw_data, raw_len);
if (search_packet_from_dablooms(&packet, thread->dup_packet_filter) == 1)
{
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
}
struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id);
if (node == NULL) {
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
@@ -1357,17 +1471,20 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
if (thread->ref_acceptor_ctx->debug)
{
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len, logger);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger);
uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser);
char *str = addr_tuple4_to_str(&inner_addr);
TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, str, ipid);
free(str);
struct tuple4 inner_addr;
struct packet pkt;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
@@ -1759,17 +1876,20 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
if (thread->ref_acceptor_ctx->debug)
{
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len, thread->logger);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, thread->logger);
uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser);
char *str = addr_tuple4_to_str(&inner_addr);
TFE_LOG_DEBUG(thread->logger, "recv packet %s (ipid: %u)", str, ipid);
free(str);
struct tuple4 inner_addr;
struct packet pkt;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
TFE_LOG_DEBUG(thread->logger, "recv packet %s (ipid: %u)", buffer, ipid);
}
if (marsio_buff_is_ctrlbuf(rx_buff))
@@ -1798,14 +1918,13 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
struct tuple4 inner_addr;
struct packet pkt;
void * logger = thread->logger;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)data, len, logger);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger);
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)data, len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, len);
@@ -1813,10 +1932,15 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
if (node == NULL) {
if (thread->ref_acceptor_ctx->debug)
{
char *str = addr_tuple4_to_str(&inner_addr);
uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser);
TFE_LOG_ERROR(logger, "decypted packet from tap %s (ipid: %u) miss session table", str, ipid);
free(str);
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
int16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
TFE_LOG_ERROR(logger, "decypted packet from tap %s (ipid: %u) miss session table", buffer, ipid);
}
throughput_metrics_inc(&packet_io_fs->decrypt_rxdrop, 1, len);
@@ -1847,7 +1971,7 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
meta.sids.elems[0] = acceptor_ctx->sce_sids;
meta.sids.elems[1] = acceptor_ctx->proxy_sids;
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0) {
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir;
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_rx, 1, len);
}
@@ -1876,8 +2000,8 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
struct tuple4 inner_addr;
struct packet pkt;
struct metadata meta = {0};
void * logger = thread->logger;
char *dst = NULL;
@@ -1885,10 +2009,9 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
int header_len = 0;
int packet_len = 0;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)data, len, logger);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr, logger);
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)data, len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
throughput_metrics_inc(&packet_io_fs->tap_pkt_rx, 1, len);
@@ -1898,10 +2021,15 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
if (thread->ref_acceptor_ctx->debug)
{
char *str = addr_tuple4_to_str(&inner_addr);
uint16_t ipid = raw_packet_parser_get_most_inner_ipid(&raw_parser);
TFE_LOG_ERROR(logger, "raw packet from tap %s (ipid: %u) miss session table", str, ipid);
free(str);
char buffer[128] = {0};
tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
{
ipid = ipv4_hdr_get_ipid((const struct ip *)ipv4_layer->hdr_ptr);
}
TFE_LOG_ERROR(logger, "raw packet from tap %s (ipid: %u) miss session table", buffer, ipid);
}
return;
@@ -1917,7 +2045,7 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
return;
}
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct addr_tuple4)) == 0)
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0)
{
meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir;
src_mac = s_ctx->client_mac;
@@ -1967,5 +2095,11 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
packet_io_set_metadata(tx_buffs[0], &meta, logger);
add_ether_header(dst, src_mac, dst_mac);
throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, packet_len);
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)
{
add_packet_to_dablooms(&pkt, thread->dup_packet_filter);
}
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH);
}