PacketIO实时发送metrics统计
This commit is contained in:
@@ -38,7 +38,7 @@ struct tfe_fieldstat_metric_t
|
|||||||
struct fieldstat_dynamic_instance *instance;
|
struct fieldstat_dynamic_instance *instance;
|
||||||
};
|
};
|
||||||
|
|
||||||
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id);
|
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close);
|
||||||
int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id);
|
int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id);
|
||||||
struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger);
|
struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger);
|
||||||
void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat);
|
void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat);
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ struct packet_info
|
|||||||
struct sids sids;
|
struct sids sids;
|
||||||
struct route_ctx route_ctx;
|
struct route_ctx route_ctx;
|
||||||
struct throughput_metrics rx;
|
struct throughput_metrics rx;
|
||||||
|
struct throughput_metrics rx_send_complete;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct session_ctx
|
struct session_ctx
|
||||||
@@ -61,10 +62,12 @@ struct session_ctx
|
|||||||
uint64_t session_id;
|
uint64_t session_id;
|
||||||
uint8_t is_passthrough;
|
uint8_t is_passthrough;
|
||||||
uint8_t protocol;
|
uint8_t protocol;
|
||||||
|
uint8_t metric_hit;
|
||||||
char session_addr[128];
|
char session_addr[128];
|
||||||
|
|
||||||
struct packet_info c2s_info;
|
struct packet_info c2s_info;
|
||||||
struct packet_info s2c_info;
|
struct packet_info s2c_info;
|
||||||
|
struct timespec metrics_last_time;
|
||||||
|
|
||||||
struct metadata *ctrl_meta;
|
struct metadata *ctrl_meta;
|
||||||
|
|
||||||
|
|||||||
@@ -3,17 +3,45 @@
|
|||||||
|
|
||||||
#include "tfe_stream.h"
|
#include "tfe_stream.h"
|
||||||
#include "tfe_resource.h"
|
#include "tfe_resource.h"
|
||||||
|
#include "tfe_packet_io.h"
|
||||||
|
|
||||||
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int downstream_dir, int upstream_rx_pkts, int upstream_rx_bytes, int upstream_dir, int thread_id)
|
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
uint16_t out_size;
|
uint16_t out_size;
|
||||||
|
struct tfe_cmsg *cmsg = s_ctx->cmsg;
|
||||||
|
struct timespec current_time;
|
||||||
|
|
||||||
if (cmsg == NULL)
|
if (cmsg == NULL)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!is_session_close)
|
||||||
|
{
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, ¤t_time);
|
||||||
|
if (current_time.tv_sec - s_ctx->metrics_last_time.tv_sec < 1)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s_ctx->metrics_last_time = current_time;
|
||||||
|
int downstream_dir = s_ctx->c2s_info.is_e2i_dir;
|
||||||
|
int downstream_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts;
|
||||||
|
int downstream_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes;
|
||||||
|
int upstream_dir = s_ctx->s2c_info.is_e2i_dir;
|
||||||
|
int upstream_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts;
|
||||||
|
int upstream_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes;
|
||||||
|
s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx;
|
||||||
|
s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx;
|
||||||
|
int hit_count = 0;
|
||||||
|
|
||||||
|
if (s_ctx->metric_hit == 0) {
|
||||||
|
s_ctx->metric_hit = 1;
|
||||||
|
hit_count = 1;
|
||||||
|
}
|
||||||
|
|
||||||
int vsys_id = 0;
|
int vsys_id = 0;
|
||||||
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size);
|
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size);
|
||||||
if (ret != 0)
|
if (ret != 0)
|
||||||
@@ -44,29 +72,29 @@ void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct t
|
|||||||
int out_bytes = 0;
|
int out_bytes = 0;
|
||||||
|
|
||||||
// incoming : E2I 的流量
|
// incoming : E2I 的流量
|
||||||
// outcoming : I2E 的流量
|
// outgoing : I2E 的流量
|
||||||
// first_ctr_packet_dir <==> client hello packet dir
|
// first_ctr_packet_dir <==> client hello packet dir
|
||||||
// 1: E2I 0:I2E
|
// 1: E2I 0:I2E
|
||||||
if (downstream_dir == 1)
|
if (downstream_dir == 1)
|
||||||
{
|
{
|
||||||
in_pkts = downstream_rx_pkts;
|
in_pkts += downstream_rx_pkts;
|
||||||
in_bytes = downstream_rx_bytes;
|
in_bytes += downstream_rx_bytes;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
out_pkts = downstream_rx_pkts;
|
out_pkts += downstream_rx_pkts;
|
||||||
out_bytes = downstream_rx_bytes;
|
out_bytes += downstream_rx_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (upstream_dir == 1)
|
if (upstream_dir == 1)
|
||||||
{
|
{
|
||||||
in_pkts = upstream_rx_pkts;
|
in_pkts += upstream_rx_pkts;
|
||||||
in_bytes = upstream_rx_bytes;
|
in_bytes += upstream_rx_bytes;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
out_pkts = upstream_rx_pkts;
|
out_pkts += upstream_rx_pkts;
|
||||||
out_bytes = upstream_rx_bytes;
|
out_bytes += upstream_rx_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int nr_tags = 0;
|
int nr_tags = 0;
|
||||||
|
|||||||
@@ -1304,7 +1304,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
|
|||||||
{
|
{
|
||||||
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
|
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
|
||||||
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
|
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
|
||||||
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->c2s_info.is_e2i_dir, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, s_ctx->s2c_info.is_e2i_dir, thread_seq);
|
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1);
|
||||||
session_table_delete_by_id(thread->session_table, meta->session_id);
|
session_table_delete_by_id(thread->session_table, meta->session_id);
|
||||||
ATOMIC_DEC(&(packet_io_fs->session_num));
|
ATOMIC_DEC(&(packet_io_fs->session_num));
|
||||||
return 0;
|
return 0;
|
||||||
@@ -1391,6 +1391,7 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
|
|||||||
static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
|
static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
|
||||||
{
|
{
|
||||||
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
|
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
|
||||||
|
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
|
||||||
struct packet_io *packet_io = thread->ref_io;
|
struct packet_io *packet_io = thread->ref_io;
|
||||||
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
|
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
|
||||||
struct packet pkt;
|
struct packet pkt;
|
||||||
@@ -1465,6 +1466,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
|
|||||||
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
|
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0);
|
||||||
flag = tfe_cmsg_get_flag(s_ctx->cmsg);
|
flag = tfe_cmsg_get_flag(s_ctx->cmsg);
|
||||||
if (flag & TFE_CMSG_FLAG_USER0) {
|
if (flag & TFE_CMSG_FLAG_USER0) {
|
||||||
send_event_log(s_ctx, thread_seq, ctx);
|
send_event_log(s_ctx, thread_seq, ctx);
|
||||||
@@ -1517,6 +1519,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
|
|||||||
is_ipv4 = s_ctx->s2c_info.is_ipv4;
|
is_ipv4 = s_ctx->s2c_info.is_ipv4;
|
||||||
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
|
throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len);
|
||||||
}
|
}
|
||||||
|
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0);
|
||||||
|
|
||||||
if (header != NULL) {
|
if (header != NULL) {
|
||||||
char *packet_buff = NULL;
|
char *packet_buff = NULL;
|
||||||
|
|||||||
Reference in New Issue
Block a user