TSG-18200 Packet IO不再通过dir方向判断c2s/s2c

This commit is contained in:
wangmenglan
2024-01-03 13:14:38 +08:00
parent 058a5e6e66
commit dae38c5144
4 changed files with 57 additions and 120 deletions

View File

@@ -38,7 +38,7 @@ struct tfe_fieldstat_metric_t
struct fieldstat_dynamic_instance *instance;
};
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id, int dir_is_e2i);
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id);
int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id);
struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger);
void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat);

View File

@@ -44,13 +44,14 @@ struct packet_io_thread_ctx
struct packet_info
{
int is_e2i_dir;
struct tuple4 tuple4;
char *header_data;
int header_len;
int is_ipv4;
int is_e2i_dir;
int header_len;
char *header_data;
struct tuple4 tuple4;
struct sids sids;
struct route_ctx route_ctx;
struct throughput_metrics rx;
};
@@ -62,14 +63,9 @@ struct session_ctx
uint8_t protocol;
char session_addr[128];
char client_mac[6];
char server_mac[6];
struct packet_info c2s_info;
struct packet_info s2c_info;
struct metadata *raw_meta_i2e;
struct metadata *raw_meta_e2i;
struct metadata *ctrl_meta;
struct tfe_cmsg *cmsg;

View File

@@ -4,7 +4,7 @@
#include "tfe_stream.h"
#include "tfe_resource.h"
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id, int dir_is_e2i)
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id)
{
int ret;
uint16_t out_size;
@@ -46,23 +46,29 @@ void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct t
// incoming : E2I 的流量
// outcoming : I2E 的流量
// first_ctr_packet_dir <==> client hello packet dir
if (dir_is_e2i == 1)
// 1: E2I 0:I2E
if (downstream_dir == 1)
{
in_pkts = downstream_rx_pkts;
in_bytes = downstream_rx_bytes;
out_pkts = upstream_rx_pkts;
out_bytes = upstream_rx_bytes;
}
else
{
in_pkts = upstream_rx_pkts;
in_bytes = upstream_rx_bytes;
out_pkts = downstream_rx_pkts;
out_bytes = downstream_rx_bytes;
}
if (upstream_dir == 1)
{
in_pkts = upstream_rx_pkts;
in_bytes = upstream_rx_bytes;
}
else
{
out_pkts = upstream_rx_pkts;
out_bytes = upstream_rx_bytes;
}
int nr_tags = 0;
struct fieldstat_tag temp_tags[TAG_MAX] = {0};

View File

@@ -308,18 +308,6 @@ static void session_ctx_free(struct session_ctx *ctx)
tfe_cmsg_destroy(&ctx->cmsg);
}
if (ctx->raw_meta_i2e)
{
metadata_free(ctx->raw_meta_i2e);
ctx->raw_meta_i2e = NULL;
}
if (ctx->raw_meta_e2i)
{
metadata_free(ctx->raw_meta_e2i);
ctx->raw_meta_e2i = NULL;
}
if (ctx->ctrl_meta)
{
metadata_free(ctx->ctrl_meta);
@@ -1089,8 +1077,6 @@ int raw_traffic_decapsulate(struct packet *handler, const char *raw_data, int ra
*is_ipv4 = l3_layer_record->type == LAYER_TYPE_IPV4 ? 1 : 0;
l2_tun_layer_record = packet_get_innermost_layer(handler, LAYER_TYPE_L2_TUN);
if (l2_tun_layer_record == NULL)
return -1;
*header_len = l3_layer_record->hdr_offset;
*header = (char *)calloc(*header_len, sizeof(char));
@@ -1254,8 +1240,6 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
passthrough:
s_ctx = session_ctx_new();
s_ctx->raw_meta_i2e = metadata_new();
s_ctx->raw_meta_e2i = metadata_new();
s_ctx->ctrl_meta = metadata_new();
s_ctx->protocol = stream_protocol_in_char;
@@ -1263,23 +1247,9 @@ passthrough:
s_ctx->session_id = meta->session_id;
tuple4_tostring(&inner_tuple4, s_ctx->session_addr, sizeof(s_ctx->session_addr));
s_ctx->cmsg = parser->cmsg;
s_ctx->policy_ids = parser->tfe_policy_ids[0];
s_ctx->is_passthrough = is_passthrough;
metadata_deep_copy(s_ctx->ctrl_meta, meta);
ether_hdr = (struct ethhdr *)(s_ctx->ctrl_meta->raw_data);
memcpy(s_ctx->client_mac, ether_hdr->h_source, 6);
memcpy(s_ctx->server_mac, ether_hdr->h_dest, 6);
// c2s
s_ctx->c2s_info.is_e2i_dir = meta->is_e2i_dir;
s_ctx->c2s_info.tuple4 = inner_tuple4;
// s2c
s_ctx->s2c_info.is_e2i_dir = !meta->is_e2i_dir;
tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4);
s_ctx->policy_ids = parser->tfe_policy_ids[0];
sids_copy(&s_ctx->ctrl_meta->sids, &meta->sids);
route_ctx_copy(&s_ctx->ctrl_meta->route_ctx, &meta->route_ctx);
@@ -1288,22 +1258,18 @@ passthrough:
if (parser->ack_len > 0)
raw_traffic_decapsulate(&pkt, parser->ack_header, parser->ack_len, &s_ctx->s2c_info.header_data, &s_ctx->s2c_info.header_len, &s_ctx->s2c_info.is_ipv4);
if (s_ctx->c2s_info.is_e2i_dir) {
sids_copy(&s_ctx->raw_meta_e2i->sids, &parser->seq_sids);
route_ctx_copy(&s_ctx->raw_meta_e2i->route_ctx, &parser->seq_route_ctx);
sids_copy(&s_ctx->raw_meta_i2e->sids, &parser->ack_sids);
route_ctx_copy(&s_ctx->raw_meta_i2e->route_ctx, &parser->ack_route_ctx);
}
else
{
sids_copy(&s_ctx->raw_meta_i2e->sids, &parser->seq_sids);
route_ctx_copy(&s_ctx->raw_meta_i2e->route_ctx, &parser->seq_route_ctx);
sids_copy(&s_ctx->raw_meta_e2i->sids, &parser->ack_sids);
route_ctx_copy(&s_ctx->raw_meta_e2i->route_ctx, &parser->ack_route_ctx);
}
// c2s
s_ctx->c2s_info.is_e2i_dir = meta->is_e2i_dir;
s_ctx->c2s_info.tuple4 = inner_tuple4;
sids_copy(&s_ctx->c2s_info.sids, &parser->seq_sids);
route_ctx_copy(&s_ctx->c2s_info.route_ctx, &parser->seq_route_ctx);
// s2c
tuple4_reverse(&inner_tuple4, &s_ctx->s2c_info.tuple4);
sids_copy(&s_ctx->s2c_info.sids, &parser->ack_sids);
route_ctx_copy(&s_ctx->s2c_info.route_ctx, &parser->ack_route_ctx);
TFE_LOG_INFO(logger, "%s: session %lu %s active first, hit rule %lu", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr, rule_id);
session_table_insert(thread->session_table, s_ctx->session_id, &(s_ctx->c2s_info.tuple4), s_ctx, session_value_free_cb);
ATOMIC_INC(&(packet_io_fs->session_num));
if (parser->seq_header)
@@ -1342,7 +1308,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq, s_ctx->c2s_info.is_e2i_dir);
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->c2s_info.is_e2i_dir, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, s_ctx->s2c_info.is_e2i_dir, thread_seq);
session_table_delete_by_id(thread->session_table, meta->session_id);
ATOMIC_DEC(&(packet_io_fs->session_num));
return 0;
@@ -1431,6 +1397,8 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct packet pkt;
struct tuple4 inner_addr;
int is_ipv4 = 0;
uint8_t flag = 0;
char *header = NULL;
@@ -1440,6 +1408,10 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta, logger) == -1) {
TFE_LOG_ERROR(logger, "%s: unexpected control packet, unable to get metadata\n\tMETA={session_id: %lu, raw_len: %d, is_e2i_dir: %d, is_ctrl_pkt: %d, l7offset: %d, is_decrypted: %u, sids_num: %d}",
@@ -1452,9 +1424,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)
{
struct packet packet;
packet_parse(&packet, (const char *)raw_data, raw_len);
if (search_packet_from_dablooms(&packet, thread->dup_packet_filter) == 1)
if (search_packet_from_dablooms(&pkt, thread->dup_packet_filter) == 1)
{
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
@@ -1471,11 +1441,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
if (thread->ref_acceptor_ctx->debug)
{
struct tuple4 inner_addr;
struct packet pkt;
memset(&inner_addr, 0, sizeof(struct tuple4));
packet_parse(&pkt, (const char *)raw_data, raw_len);
packet_get_innermost_tuple4(&pkt, &inner_addr);
uint16_t ipid = 0;
const struct layer_record *ipv4_layer = packet_get_innermost_layer(&pkt, LAYER_TYPE_IPV4);
if (ipv4_layer)
@@ -1496,10 +1461,13 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
if (s_ctx->is_passthrough > 0) {
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir)
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len);
else
}
else {
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
}
flag = tfe_cmsg_get_flag(s_ctx->cmsg);
if (flag & TFE_CMSG_FLAG_USER0) {
@@ -1513,8 +1481,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
if (meta.is_decrypted)
{
throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len);
// c2s
if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) {
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac);
throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len);
if (packet_io->config.enable_iouring) {
@@ -1524,7 +1491,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
tap_write(thread->tap_ctx->tap_s, raw_data, raw_len, logger);
}
}
// s2c
else {
add_ether_header(raw_data, packet_io->config.tap_s_mac, packet_io->config.tap_c_mac);
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len);
@@ -1539,26 +1505,17 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
else
{
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
if (meta.is_e2i_dir) {
if (metadata_is_empty(s_ctx->raw_meta_e2i)) {
metadata_deep_copy(s_ctx->raw_meta_e2i, &meta);
}
s_ctx->raw_meta_e2i->sids = meta.sids;
}
else {
if (metadata_is_empty(s_ctx->raw_meta_i2e)) {
metadata_deep_copy(s_ctx->raw_meta_i2e, &meta);
}
s_ctx->raw_meta_i2e->sids = meta.sids;
}
if (meta.is_e2i_dir == s_ctx->c2s_info.is_e2i_dir) {
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
s_ctx->c2s_info.sids = meta.sids;
s_ctx->c2s_info.is_e2i_dir = meta.is_e2i_dir;
header = s_ctx->c2s_info.header_data;
header_len = s_ctx->c2s_info.header_len;
is_ipv4 = s_ctx->c2s_info.is_ipv4;
throughput_metrics_inc(&s_ctx->c2s_info.rx, 1, raw_len);
}
else {
s_ctx->s2c_info.sids = meta.sids;
s_ctx->s2c_info.is_e2i_dir = meta.is_e2i_dir;
header = s_ctx->s2c_info.header_data;
header_len = s_ctx->s2c_info.header_len;
is_ipv4 = s_ctx->s2c_info.is_ipv4;
@@ -1972,22 +1929,14 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
meta.sids.elems[1] = acceptor_ctx->proxy_sids;
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir;
meta.route_ctx = s_ctx->c2s_info.route_ctx;
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_rx, 1, len);
}
else {
meta.is_e2i_dir = s_ctx->s2c_info.is_e2i_dir;
meta.route_ctx = s_ctx->s2c_info.route_ctx;
throughput_metrics_inc(&packet_io_fs->tap_s_pkt_rx, 1, len);
}
if (meta.is_e2i_dir)
{
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_e2i->route_ctx);
}
else
{
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx);
}
packet_io_set_metadata(tx_buffs[0], &meta, logger);
throughput_metrics_inc(&packet_io_fs->decrypt_tx, 1, len);
marsio_send_burst_with_options(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1, MARSIO_SEND_OPT_REHASH);
@@ -2047,30 +1996,17 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0)
{
meta.is_e2i_dir = s_ctx->c2s_info.is_e2i_dir;
src_mac = s_ctx->client_mac;
dst_mac = s_ctx->server_mac;
header = s_ctx->c2s_info.header_data;
header_len = s_ctx->c2s_info.header_len;
meta.sids = s_ctx->c2s_info.sids;
meta.route_ctx = s_ctx->c2s_info.route_ctx;
}
else
{
meta.is_e2i_dir = s_ctx->s2c_info.is_e2i_dir;
src_mac = s_ctx->server_mac;
dst_mac = s_ctx->client_mac;
header = s_ctx->s2c_info.header_data;
header_len = s_ctx->s2c_info.header_len;
}
if (meta.is_e2i_dir)
{
sids_copy(&meta.sids, &s_ctx->raw_meta_e2i->sids);
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_e2i->route_ctx);
}
else
{
sids_copy(&meta.sids, &s_ctx->raw_meta_i2e->sids);
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_meta_i2e->route_ctx);
meta.sids = s_ctx->s2c_info.sids;
meta.route_ctx = s_ctx->s2c_info.route_ctx;
}
if (header != NULL) {
@@ -2093,7 +2029,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
meta.l7offset = 0;
packet_io_set_metadata(tx_buffs[0], &meta, logger);
add_ether_header(dst, src_mac, dst_mac);
throughput_metrics_inc(&packet_io_fs->raw_pkt_tx, 1, packet_len);
if (thread->ref_acceptor_ctx->dup_packet_filter_enable == 1)