TFE适配marsio_dp_trace_measurements_can_emit()的变更

This commit is contained in:
luwenpeng
2024-04-26 17:37:22 +08:00
parent 0d17311373
commit 909f3b291d
2 changed files with 78 additions and 32 deletions

View File

@@ -8,18 +8,62 @@ extern "C"
#include <marsio.h> #include <marsio.h>
#define DP_TRACE_INFO(mr_ins, mr_buff, module, fmt, ...) \ static inline void record_msg_on_ctrl_pkt(char *buff, int size, uint64_t rule_id, uint64_t session_id, const char *state, const char *action, const char *reason)
do \ {
{ \ int n = snprintf(buff, size, "intercept rule id=%lu, session=%lu, state=%s", rule_id, session_id, state);
if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff)) \ if (action)
{ \ {
marsio_dp_trace_measurement_emit_fmt(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, module, fmt, ##__VA_ARGS__); \ n += snprintf(buff + n, size - n, ", action=%s", action);
marsio_dp_trace_measurement_emit_fmt(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, module, fmt, ##__VA_ARGS__); \ }
} \ if (reason)
} while (0) {
n += snprintf(buff + n, size - n, ", reason=%s", reason);
}
}
static inline void record_msg_on_raw_pkt(char *buff, int size, uint64_t rule_id, const char *traffic, const char *action, const char *reason)
{
int n = snprintf(buff, size, "intercept rule id=%lu, traffic=%s, action=%s", rule_id, traffic, action);
if (reason)
{
snprintf(buff + n, size - n, ", reason=%s", reason);
}
}
static inline void tfe_dp_telemetry_on_ctrl_pkt(mr_instance *mr_ins, marsio_buff_t *mr_buff, uint64_t rule_id, uint64_t session_id, const char *state, const char *action, const char *reason)
{
if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE))
{
char buff[512] = {0};
record_msg_on_ctrl_pkt(buff, sizeof(buff), rule_id, session_id, state, action, reason);
marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, "Session Synchronization", buff);
}
if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))
{
char buff[512] = {0};
record_msg_on_ctrl_pkt(buff, sizeof(buff), rule_id, session_id, state, action, reason);
marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "Session Synchronization", buff);
}
}
static inline void tfe_dp_on_raw_pkt(mr_instance *mr_ins, marsio_buff_t *mr_buff, uint64_t rule_id, const char *traffic, const char *action, const char *reason)
{
if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE))
{
char buff[512] = {0};
record_msg_on_raw_pkt(buff, sizeof(buff), rule_id, traffic, action, reason);
marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, "Packet I/O", buff);
}
if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))
{
char buff[512] = {0};
record_msg_on_raw_pkt(buff, sizeof(buff), rule_id, traffic, action, reason);
marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "Packet I/O", buff);
}
}
#ifdef __cpluscplus #ifdef __cpluscplus
} }
#endif #endif
#endif #endif

View File

@@ -1133,7 +1133,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
{ {
is_passthrough = 1; is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param); set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, 0, meta->session_id, "active", "passthrough", "invalid intercept param");
goto passthrough; goto passthrough;
} }
tfe_cmsg_set(parser->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&rule_id, sizeof(uint64_t)); tfe_cmsg_set(parser->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&rule_id, sizeof(uint64_t));
@@ -1142,7 +1142,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
if (ret != 0) { if (ret != 0) {
is_passthrough = 1; is_passthrough = 1;
set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param); set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid intercept param");
goto passthrough; goto passthrough;
} }
@@ -1151,7 +1151,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->hit_no_intercept_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->hit_no_intercept_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_no_intercept_param); set_passthrough_reason(parser->cmsg, reason_no_intercept_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_no_intercept_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "hit no intercept");
goto passthrough; goto passthrough;
} }
__atomic_fetch_add(&packet_io_fs->hit_intercept_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->hit_intercept_num, 1, __ATOMIC_RELAXED);
@@ -1162,7 +1162,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
@@ -1177,7 +1177,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
tcp_restore_info_dump(&restore_info, meta->session_id, logger); tcp_restore_info_dump(&restore_info, meta->session_id, logger);
@@ -1189,7 +1189,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
@@ -1201,7 +1201,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
@@ -1219,7 +1219,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
@@ -1230,7 +1230,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
} }
@@ -1248,7 +1248,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
is_passthrough = 1; is_passthrough = 1;
__atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED);
set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param");
goto passthrough; goto passthrough;
} }
__atomic_fetch_add(&packet_io_fs->can_intercept_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->can_intercept_num, 1, __ATOMIC_RELAXED);
@@ -1258,11 +1258,11 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff,
set_passthrough_reason(parser->cmsg, reason_underlying_stream_error); set_passthrough_reason(parser->cmsg, reason_underlying_stream_error);
if (parser->intercpet_data & IS_SINGLE) { if (parser->intercpet_data & IS_SINGLE) {
__atomic_fetch_add(&packet_io_fs->asymmetric_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->asymmetric_num, 1, __ATOMIC_RELAXED);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:asymmetric", reason_underlying_stream_error); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "asymmetric traffic");
} }
else if (parser->intercpet_data & IS_TUNNEL) { else if (parser->intercpet_data & IS_TUNNEL) {
__atomic_fetch_add(&packet_io_fs->tunnel_num, 1, __ATOMIC_RELAXED); __atomic_fetch_add(&packet_io_fs->tunnel_num, 1, __ATOMIC_RELAXED);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:tunnel", reason_underlying_stream_error); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "tunnel traffic");
} }
} }
@@ -1314,7 +1314,10 @@ passthrough:
if (parser->ack_header) if (parser->ack_header)
FREE(&parser->ack_header); FREE(&parser->ack_header);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "active control packet, rule_id:%lu", rule_id); if (is_passthrough == 0)
{
tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "intercept", NULL);
}
return 0; return 0;
} }
@@ -1348,7 +1351,7 @@ static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff,
{ {
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "closing control packet"); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, meta->session_id, "closing", NULL, NULL);
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1); tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1);
session_table_delete_by_id(thread->session_table, meta->session_id); session_table_delete_by_id(thread->session_table, meta->session_id);
ATOMIC_DEC(&(packet_io_fs->session_num)); ATOMIC_DEC(&(packet_io_fs->session_num));
@@ -1369,7 +1372,7 @@ static int handle_session_resetall(struct metadata *meta, marsio_buff_t *rx_buff
void * logger = thread->logger; void * logger = thread->logger;
TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "resetall control packet"); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, 0, meta->session_id, "resetall", NULL, NULL);
ATOMIC_ZERO(&(packet_io_fs->session_num)); ATOMIC_ZERO(&(packet_io_fs->session_num));
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++)
{ {
@@ -1462,7 +1465,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num); LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "get metadata failed"); tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "raw", "passthrough", "miss metadata");
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1; return -1;
} }
@@ -1474,7 +1477,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len); throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len);
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "duplicate packet"); tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "duplicated", "passthrough", NULL);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1; return -1;
} }
@@ -1497,8 +1500,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
tuple4_tostring(&inner_addr, buffer, sizeof(buffer)); tuple4_tostring(&inner_addr, buffer, sizeof(buffer));
TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid); TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid);
} }
tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "decrypted", "passthrough", "miss session");
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "search table for session_id:%lu failed", meta.session_id);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1; return -1;
} }
@@ -1522,14 +1524,14 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
send_event_log(s_ctx, thread_seq, ctx); send_event_log(s_ctx, thread_seq, ctx);
tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT);
} }
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough"); tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "decrypted", "passthrough", NULL);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return 0; return 0;
} }
if (meta.is_decrypted) if (meta.is_decrypted)
{ {
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "decrypted traffic"); tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "decrypted", "intercept", NULL);
throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) { if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac); add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac);
@@ -1554,7 +1556,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
} }
else else
{ {
DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "intercept"); tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "raw", "intercept", NULL);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) { if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) {
s_ctx->c2s_info.sids = meta.sids; s_ctx->c2s_info.sids = meta.sids;