diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index aaa733e..878f14f 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -38,7 +38,7 @@ struct tfe_fieldstat_metric_t struct fieldstat_dynamic_instance *instance; }; -void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id); +void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close); int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger); void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat); diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index 137ecc6..6d5646d 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -53,6 +53,7 @@ struct packet_info struct sids sids; struct route_ctx route_ctx; struct throughput_metrics rx; + struct throughput_metrics rx_send_complete; }; struct session_ctx @@ -61,10 +62,12 @@ struct session_ctx uint64_t session_id; uint8_t is_passthrough; uint8_t protocol; + uint8_t metric_hit; char session_addr[128]; struct packet_info c2s_info; struct packet_info s2c_info; + struct timespec metrics_last_time; struct metadata *ctrl_meta; diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index 2fc1e76..04523af 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -3,17 +3,45 @@ #include "tfe_stream.h" #include "tfe_resource.h" +#include "tfe_packet_io.h" -void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id) +void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close) { int ret; uint16_t out_size; + struct tfe_cmsg *cmsg = s_ctx->cmsg; + struct timespec current_time; if (cmsg == NULL) { return; } + if (!is_session_close) + { + clock_gettime(CLOCK_MONOTONIC, ¤t_time); + if (current_time.tv_sec - s_ctx->metrics_last_time.tv_sec < 1) + { + return; + } + } + + s_ctx->metrics_last_time = current_time; + int downstream_dir = s_ctx->c2s_info.is_e2i_dir; + int downstream_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; + int downstream_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; + int upstream_dir = s_ctx->s2c_info.is_e2i_dir; + int upstream_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; + int upstream_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; + s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; + s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; + int hit_count = 0; + + if (s_ctx->metric_hit == 0) { + s_ctx->metric_hit = 1; + hit_count = 1; + } + int vsys_id = 0; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); if (ret != 0) @@ -44,29 +72,29 @@ void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct t int out_bytes = 0; // incoming : E2I 的流量 - // outcoming : I2E 的流量 + // outgoing : I2E 的流量 // first_ctr_packet_dir <==> client hello packet dir // 1: E2I 0:I2E if (downstream_dir == 1) { - in_pkts = downstream_rx_pkts; - in_bytes = downstream_rx_bytes; + in_pkts += downstream_rx_pkts; + in_bytes += downstream_rx_bytes; } else { - out_pkts = downstream_rx_pkts; - out_bytes = downstream_rx_bytes; + out_pkts += downstream_rx_pkts; + out_bytes += downstream_rx_bytes; } if (upstream_dir == 1) { - in_pkts = upstream_rx_pkts; - in_bytes = upstream_rx_bytes; + in_pkts += upstream_rx_pkts; + in_bytes += upstream_rx_bytes; } else { - out_pkts = upstream_rx_pkts; - out_bytes = upstream_rx_bytes; + out_pkts += upstream_rx_pkts; + out_bytes += upstream_rx_bytes; } int nr_tags = 0; diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index ce2cf0b..7ecb2e9 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -1304,7 +1304,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); - tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->c2s_info.is_e2i_dir, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, s_ctx->s2c_info.is_e2i_dir, thread_seq); + tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1); session_table_delete_by_id(thread->session_table, meta->session_id); ATOMIC_DEC(&(packet_io_fs->session_num)); return 0; @@ -1391,6 +1391,7 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct packet pkt; @@ -1465,6 +1466,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); } + tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0); flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { send_event_log(s_ctx, thread_seq, ctx); @@ -1517,6 +1519,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx is_ipv4 = s_ctx->s2c_info.is_ipv4; throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); } + tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0); if (header != NULL) { char *packet_buff = NULL;