diff --git a/common/include/tfe_dp_trace.h b/common/include/tfe_dp_trace.h index aa24512..a5c8931 100644 --- a/common/include/tfe_dp_trace.h +++ b/common/include/tfe_dp_trace.h @@ -8,18 +8,62 @@ extern "C" #include -#define DP_TRACE_INFO(mr_ins, mr_buff, module, fmt, ...) \ - do \ - { \ - if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff)) \ - { \ - marsio_dp_trace_measurement_emit_fmt(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, module, fmt, ##__VA_ARGS__); \ - marsio_dp_trace_measurement_emit_fmt(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, module, fmt, ##__VA_ARGS__); \ - } \ - } while (0) +static inline void record_msg_on_ctrl_pkt(char *buff, int size, uint64_t rule_id, uint64_t session_id, const char *state, const char *action, const char *reason) +{ + int n = snprintf(buff, size, "intercept rule id=%lu, session=%lu, state=%s", rule_id, session_id, state); + if (action) + { + n += snprintf(buff + n, size - n, ", action=%s", action); + } + if (reason) + { + n += snprintf(buff + n, size - n, ", reason=%s", reason); + } +} + +static inline void record_msg_on_raw_pkt(char *buff, int size, uint64_t rule_id, const char *traffic, const char *action, const char *reason) +{ + int n = snprintf(buff, size, "intercept rule id=%lu, traffic=%s, action=%s", rule_id, traffic, action); + if (reason) + { + snprintf(buff + n, size - n, ", reason=%s", reason); + } +} + +static inline void tfe_dp_telemetry_on_ctrl_pkt(mr_instance *mr_ins, marsio_buff_t *mr_buff, uint64_t rule_id, uint64_t session_id, const char *state, const char *action, const char *reason) +{ + if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE)) + { + char buff[512] = {0}; + record_msg_on_ctrl_pkt(buff, sizeof(buff), rule_id, session_id, state, action, reason); + marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, "Session Synchronization", buff); + } + if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)) + { + char buff[512] = {0}; + record_msg_on_ctrl_pkt(buff, sizeof(buff), rule_id, session_id, state, action, reason); + marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "Session Synchronization", buff); + } +} + +static inline void tfe_dp_on_raw_pkt(mr_instance *mr_ins, marsio_buff_t *mr_buff, uint64_t rule_id, const char *traffic, const char *action, const char *reason) +{ + if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE)) + { + char buff[512] = {0}; + record_msg_on_raw_pkt(buff, sizeof(buff), rule_id, traffic, action, reason); + marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE, "Packet I/O", buff); + } + if (marsio_dp_trace_measurements_can_emit(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)) + { + char buff[512] = {0}; + record_msg_on_raw_pkt(buff, sizeof(buff), rule_id, traffic, action, reason); + marsio_dp_trace_measurement_emit_str(mr_ins, mr_buff, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "Packet I/O", buff); + } +} #ifdef __cpluscplus } #endif -#endif \ No newline at end of file +#endif diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index eda7b38..7cb89c0 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -1133,7 +1133,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, 0, meta->session_id, "active", "passthrough", "invalid intercept param"); goto passthrough; } tfe_cmsg_set(parser->cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&rule_id, sizeof(uint64_t)); @@ -1142,7 +1142,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, if (ret != 0) { is_passthrough = 1; set_passthrough_reason(parser->cmsg, reason_invalid_intercept_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_intercept_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid intercept param"); goto passthrough; } @@ -1151,7 +1151,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->hit_no_intercept_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_no_intercept_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_no_intercept_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "hit no intercept"); goto passthrough; } __atomic_fetch_add(&packet_io_fs->hit_intercept_num, 1, __ATOMIC_RELAXED); @@ -1162,7 +1162,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } @@ -1177,7 +1177,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } tcp_restore_info_dump(&restore_info, meta->session_id, logger); @@ -1189,7 +1189,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } @@ -1201,7 +1201,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } @@ -1219,7 +1219,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } @@ -1230,7 +1230,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } } @@ -1248,7 +1248,7 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, is_passthrough = 1; __atomic_fetch_add(&packet_io_fs->tcp_pcy_inval_num, 1, __ATOMIC_RELAXED); set_passthrough_reason(parser->cmsg, reason_invalid_tcp_policy_param); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s", reason_invalid_tcp_policy_param); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "invalid tcp policy param"); goto passthrough; } __atomic_fetch_add(&packet_io_fs->can_intercept_num, 1, __ATOMIC_RELAXED); @@ -1258,11 +1258,11 @@ static int handle_session_opening(struct metadata *meta, marsio_buff_t *rx_buff, set_passthrough_reason(parser->cmsg, reason_underlying_stream_error); if (parser->intercpet_data & IS_SINGLE) { __atomic_fetch_add(&packet_io_fs->asymmetric_num, 1, __ATOMIC_RELAXED); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:asymmetric", reason_underlying_stream_error); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "asymmetric traffic"); } else if (parser->intercpet_data & IS_TUNNEL) { __atomic_fetch_add(&packet_io_fs->tunnel_num, 1, __ATOMIC_RELAXED); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough:%s:tunnel", reason_underlying_stream_error); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "passthrough", "tunnel traffic"); } } @@ -1314,7 +1314,10 @@ passthrough: if (parser->ack_header) FREE(&parser->ack_header); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "active control packet, rule_id:%lu", rule_id); + if (is_passthrough == 0) + { + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, rule_id, meta->session_id, "active", "intercept", NULL); + } return 0; } @@ -1348,7 +1351,7 @@ static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff, { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "closing control packet"); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, meta->session_id, "closing", NULL, NULL); tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1); session_table_delete_by_id(thread->session_table, meta->session_id); ATOMIC_DEC(&(packet_io_fs->session_num)); @@ -1369,7 +1372,7 @@ static int handle_session_resetall(struct metadata *meta, marsio_buff_t *rx_buff void * logger = thread->logger; TFE_LOG_ERROR(logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "resetall control packet"); + tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, 0, meta->session_id, "resetall", NULL, NULL); ATOMIC_ZERO(&(packet_io_fs->session_num)); for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { @@ -1462,7 +1465,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.is_e2i_dir, meta.is_ctrl_pkt, meta.l7offset, meta.is_decrypted, meta.sids.num); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "get metadata failed"); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "raw", "passthrough", "miss metadata"); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } @@ -1474,7 +1477,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); throughput_metrics_inc(&packet_io_fs->raw_bypass, 1, raw_len); throughput_metrics_inc(&packet_io_fs->dup_bypass, 1, raw_len); - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "duplicate packet"); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "duplicated", "passthrough", NULL); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } @@ -1497,8 +1500,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx tuple4_tostring(&inner_addr, buffer, sizeof(buffer)); TFE_LOG_ERROR(logger, "packet from nf %lu: %s (ipid: %u) miss session table", meta.session_id, buffer, ipid); } - - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "search table for session_id:%lu failed", meta.session_id); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, 0, "decrypted", "passthrough", "miss session"); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } @@ -1522,14 +1524,14 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx send_event_log(s_ctx, thread_seq, ctx); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); } - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "passthrough"); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "decrypted", "passthrough", NULL); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return 0; } if (meta.is_decrypted) { - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "decrypted traffic"); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "decrypted", "intercept", NULL); throughput_metrics_inc(&packet_io_fs->decrypt_rx, 1, raw_len); if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) { add_ether_header(raw_data, packet_io->config.tap_c_mac, packet_io->config.tap_s_mac); @@ -1554,7 +1556,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx } else { - DP_TRACE_INFO(packet_io->instance, rx_buff, "packet I/O", "intercept"); + tfe_dp_on_raw_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, "raw", "intercept", NULL); throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); if (memcmp(&inner_addr, &s_ctx->c2s_info.tuple4, sizeof(struct tuple4)) == 0) { s_ctx->c2s_info.sids = meta.sids;