diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index 33826ef..baaf4c8 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -38,7 +38,7 @@ struct tfe_fieldstat_metric_t struct fieldstat_dynamic_instance *instance; }; -void tfe_set_intercept_metric(struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id); +void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id); int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger); void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat); diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index 9edf481..848d098 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -89,6 +89,7 @@ struct acceptor_kni_v4 struct packet_io *io; struct packet_io_fs *packet_io_fs; + struct tfe_fieldstat_metric_t *metric; struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX]; struct tfe_proxy *ref_proxy; diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index b9ec2a6..b529297 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -4,11 +4,10 @@ #include "tfe_stream.h" #include "tfe_resource.h" -void tfe_set_intercept_metric(struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id) +void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id) { int ret; uint16_t out_size; - struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT); if (cmsg == NULL) { diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 7fe0476..d3c1eae 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -1416,6 +1416,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; + struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; void * logger = thread->logger; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); @@ -1423,7 +1424,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); - tfe_set_intercept_metric(s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq); + tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq); session_table_delete_by_id(thread->session_table, meta->session_id); ATOMIC_DEC(&(packet_io_fs->session_num)); return 0; diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index 18902ba..c1b0055 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -19,6 +19,7 @@ #include "tap.h" #include "tfe_packet_io.h" #include "tfe_session_table.h" +#include "tfe_fieldstat.h" void * g_packet_io_logger = NULL; @@ -37,12 +38,40 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger) return ret; } +static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger) +{ + int cycle=0; + unsigned short telegraf_port=0; + char telegraf_ip[TFE_STRING_MAX]={0}; + char app_name[TFE_STRING_MAX]={0}; + struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL; + + MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port)); + MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip)); + MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric"); + MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000); + + dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger); + if (dynamic_fieldstat == NULL) + { + TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric."); + return NULL; + } + TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip); + TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port); + TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); + TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); + + return dynamic_fieldstat; +} + void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) { if (ctx) { packet_io_destory(ctx->io); packet_io_fs_destory(ctx->packet_io_fs); + tfe_fieldstat_metric_destroy(ctx->metric); free(ctx); ctx = NULL; } @@ -79,6 +108,12 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) goto error_out; } + ctx->metric = create_fieldstat_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger); + if(ctx->metric == NULL) + { + goto error_out; + } + return ctx; error_out: