TSG-19723 Support Datapath Packet Trace

This commit is contained in:
wangmenglan
2024-03-01 19:19:56 +08:00
parent eb70e87583
commit bbe6cbe848
3 changed files with 63 additions and 10 deletions

View File

@@ -0,0 +1,24 @@
#ifndef _TFE_DP_TRACE_H
#define _TFE_DP_TRACE_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <marsio.h>
#define DP_TRACE_INFO(mr_ins, mr_buff, module, fmt, ...) do{ \
if (marsio_dp_trace_record_can_emit(mr_buff)) \
{ \
marsio_dp_trace_record_emit_fmt(mr_ins, mr_buff, module, fmt, ##__VA_ARGS__); \
} \
}while(0)
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -58,7 +58,7 @@ struct packet_info
struct session_ctx
{
int policy_ids;
uint64_t policy_ids;
uint64_t session_id;
uint8_t is_passthrough;
uint8_t protocol;

View File

@@ -34,6 +34,7 @@
#include "tfe_fieldstat.h"
#include "dablooms.h"
#include "timestamp.h"
#include "tfe_dp_trace.h"
/******************************************************************************
* Struct
@@ -1074,7 +1075,7 @@ int raw_traffic_decapsulate(const char *raw_data, int raw_len, struct packet_inf
// return 0 : success
// return -1 : error
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
int ret = 0;
int fd_downstream = 0;
@@ -1121,6 +1122,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
{
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param);
goto passthrough;
}
tfe_cmsg_set(parser->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&rule_id, sizeof(uint64_t));
@@ -1129,6 +1131,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
if (ret != 0) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param);
goto passthrough;
}
if (parser->intercpet_data == 0) {
@@ -1136,6 +1139,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
if (hit_no_intercept == 1) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_no_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_no_intercept_param);
goto passthrough;
}
@@ -1143,6 +1147,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
if (ret != 0) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
@@ -1156,6 +1161,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
if (overwrite_tcp_mss(parser->cmsg, &restore_info, meta->session_id, logger)) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
tcp_restore_info_dump(&restore_info, meta->session_id, logger);
@@ -1166,6 +1172,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(UPSTREAM)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
@@ -1176,6 +1183,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(DOWNSTREAM)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
@@ -1192,6 +1200,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_c)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
@@ -1201,6 +1210,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tcp_restore_fd_create(fd_fake_s)", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
}
@@ -1217,12 +1227,19 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
TFE_LOG_ERROR(logger, "%s: session %lu Failed at tfe_proxy_fds_accept()", LOG_TAG_PKTIO, meta->session_id);
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param);
goto passthrough;
}
}
else if (parser->intercpet_data & (IS_SINGLE | IS_TUNNEL)) {
is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_underlying_stream_error);
if (parser->intercpet_data & IS_SINGLE) {
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:single", reason_underlying_stream_error);
}
else if (parser->intercpet_data & IS_TUNNEL) {
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:tunnel", reason_underlying_stream_error);
}
}
passthrough:
@@ -1234,7 +1251,7 @@ passthrough:
s_ctx->session_id = meta->session_id;
tuple4_tostring(&inner_tuple4, s_ctx->session_addr, sizeof(s_ctx->session_addr));
s_ctx->cmsg = parser->cmsg;
s_ctx->policy_ids = parser->tfe_policy_ids[0];
s_ctx->policy_ids = rule_id;
s_ctx->is_passthrough = is_passthrough;
metadata_deep_copy(s_ctx->ctrl_meta, meta);
sids_copy(&s_ctx->ctrl_meta->sids, &meta->sids);
@@ -1272,19 +1289,21 @@ passthrough:
FREE(&parser->seq_header);
if (parser->ack_header)
FREE(&parser->ack_header);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "active control packet, rule_id:%lu", rule_id);
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
static int handle_session_active(struct metadata *meta, marsio_buff_t *rx_buff, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (!node)
{
return handle_session_opening(meta, parser, thread_seq, ctx);
return handle_session_opening(meta, rx_buff, parser, thread_seq, ctx);
}
return 0;
@@ -1292,9 +1311,10 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *
// return 0 : success
// return -1 : error
static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
void * logger = thread->logger;
@@ -1304,6 +1324,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "closing control packet");
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1);
session_table_delete_by_id(thread->session_table, meta->session_id);
ATOMIC_DEC(&(packet_io_fs->session_num));
@@ -1315,14 +1336,16 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
// return 0 : success
// return -1 : error
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
static int handle_session_resetall(struct metadata *meta, marsio_buff_t *rx_buff, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
void * logger = thread->logger;
TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "resetall control packet");
ATOMIC_ZERO(&(packet_io_fs->session_num));
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++)
{
@@ -1373,13 +1396,13 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
break;
case SESSION_STATE_CLOSING:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED);
return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx);
return handle_session_closing(&meta, rx_buff, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_ACTIVE:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED);
return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx);
return handle_session_active(&meta, rx_buff, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_RESETALL:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED);
return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx);
return handle_session_resetall(&meta, rx_buff, &ctrl_parser, thread_seq, ctx);
default:
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
break;
@@ -1415,6 +1438,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "get metadata failed");
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
@@ -1426,6 +1450,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "duplicate packet");
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
@@ -1449,6 +1474,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid);
}
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "search table for session_id:%lu failed", meta.session_id);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
@@ -1472,12 +1498,14 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
send_event_log(s_ctx, thread_seq, ctx);
tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT);
}
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough");
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return 0;
}
if (meta.is_decrypted)
{
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "decrypted traffic");
throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac);
@@ -1502,6 +1530,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
}
else
{
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "intercept");
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
s_ctx->c2s_info.sids = meta.sids;