TFE Packet IO 创建metric句柄
This commit is contained in:
@@ -38,7 +38,7 @@ struct tfe_fieldstat_metric_t
|
|||||||
struct fieldstat_dynamic_instance *instance;
|
struct fieldstat_dynamic_instance *instance;
|
||||||
};
|
};
|
||||||
|
|
||||||
void tfe_set_intercept_metric(struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id);
|
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id);
|
||||||
int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id);
|
int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id);
|
||||||
struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger);
|
struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger);
|
||||||
void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat);
|
void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat);
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ struct acceptor_kni_v4
|
|||||||
|
|
||||||
struct packet_io *io;
|
struct packet_io *io;
|
||||||
struct packet_io_fs *packet_io_fs;
|
struct packet_io_fs *packet_io_fs;
|
||||||
|
struct tfe_fieldstat_metric_t *metric;
|
||||||
struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX];
|
struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX];
|
||||||
|
|
||||||
struct tfe_proxy *ref_proxy;
|
struct tfe_proxy *ref_proxy;
|
||||||
|
|||||||
@@ -4,11 +4,10 @@
|
|||||||
#include "tfe_stream.h"
|
#include "tfe_stream.h"
|
||||||
#include "tfe_resource.h"
|
#include "tfe_resource.h"
|
||||||
|
|
||||||
void tfe_set_intercept_metric(struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id)
|
void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct tfe_cmsg *cmsg, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes, int thread_id)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
uint16_t out_size;
|
uint16_t out_size;
|
||||||
struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT);
|
|
||||||
|
|
||||||
if (cmsg == NULL)
|
if (cmsg == NULL)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1416,6 +1416,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
|
|||||||
{
|
{
|
||||||
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
|
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
|
||||||
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
|
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
|
||||||
|
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
|
||||||
void * logger = thread->logger;
|
void * logger = thread->logger;
|
||||||
|
|
||||||
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
|
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
|
||||||
@@ -1423,7 +1424,7 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
|
|||||||
{
|
{
|
||||||
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
|
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
|
||||||
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
|
TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id);
|
||||||
tfe_set_intercept_metric(s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq);
|
tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx->cmsg, 1, s_ctx->c2s_info.rx.n_pkts, s_ctx->c2s_info.rx.n_bytes, s_ctx->s2c_info.rx.n_pkts, s_ctx->s2c_info.rx.n_bytes, thread_seq);
|
||||||
session_table_delete_by_id(thread->session_table, meta->session_id);
|
session_table_delete_by_id(thread->session_table, meta->session_id);
|
||||||
ATOMIC_DEC(&(packet_io_fs->session_num));
|
ATOMIC_DEC(&(packet_io_fs->session_num));
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
#include "tap.h"
|
#include "tap.h"
|
||||||
#include "tfe_packet_io.h"
|
#include "tfe_packet_io.h"
|
||||||
#include "tfe_session_table.h"
|
#include "tfe_session_table.h"
|
||||||
|
#include "tfe_fieldstat.h"
|
||||||
|
|
||||||
|
|
||||||
void * g_packet_io_logger = NULL;
|
void * g_packet_io_logger = NULL;
|
||||||
@@ -37,12 +38,40 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger)
|
||||||
|
{
|
||||||
|
int cycle=0;
|
||||||
|
unsigned short telegraf_port=0;
|
||||||
|
char telegraf_ip[TFE_STRING_MAX]={0};
|
||||||
|
char app_name[TFE_STRING_MAX]={0};
|
||||||
|
struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL;
|
||||||
|
|
||||||
|
MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port));
|
||||||
|
MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip));
|
||||||
|
MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric");
|
||||||
|
MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000);
|
||||||
|
|
||||||
|
dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger);
|
||||||
|
if (dynamic_fieldstat == NULL)
|
||||||
|
{
|
||||||
|
TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric.");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip);
|
||||||
|
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port);
|
||||||
|
TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
|
||||||
|
TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
|
||||||
|
|
||||||
|
return dynamic_fieldstat;
|
||||||
|
}
|
||||||
|
|
||||||
void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
|
void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
|
||||||
{
|
{
|
||||||
if (ctx)
|
if (ctx)
|
||||||
{
|
{
|
||||||
packet_io_destory(ctx->io);
|
packet_io_destory(ctx->io);
|
||||||
packet_io_fs_destory(ctx->packet_io_fs);
|
packet_io_fs_destory(ctx->packet_io_fs);
|
||||||
|
tfe_fieldstat_metric_destroy(ctx->metric);
|
||||||
free(ctx);
|
free(ctx);
|
||||||
ctx = NULL;
|
ctx = NULL;
|
||||||
}
|
}
|
||||||
@@ -79,6 +108,12 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger)
|
|||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx->metric = create_fieldstat_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger);
|
||||||
|
if(ctx->metric == NULL)
|
||||||
|
{
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
|
|
||||||
return ctx;
|
return ctx;
|
||||||
|
|
||||||
error_out:
|
error_out:
|
||||||
|
|||||||
Reference in New Issue
Block a user