TSG-21854 TFE使用fieldstat4序列化Manipulation Policy的metric并输出到kafka

This commit is contained in:
fengweihao
2024-07-26 16:50:51 +08:00
parent a59b939033
commit 83f51432b1
17 changed files with 382 additions and 406 deletions

View File

@@ -23,7 +23,6 @@
#include "tfe_fieldstat.h"
#include "dablooms.h"
#include "timestamp.h"
#include "metrics.h"
void * g_packet_io_logger = NULL;
@@ -47,13 +46,28 @@ void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
{
packet_io_destory(ctx->io);
packet_io_fs_destory(ctx->packet_io_fs);
metrics_destory(ctx->metrics);
free(ctx);
ctx = NULL;
}
return;
}
struct fieldstat_easy_intercept *packet_io_fieldstat_easy_create(const char *profile, void *logger)
{
int packet_io_threads=0;
int output_fs_interval_ms=0;
char app_name[TFE_STRING_MAX]={0};
struct fieldstat_easy_intercept *intercept=NULL;
MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", &packet_io_threads, 0);
MESA_load_profile_int_def(profile, "proxy_hits", "output_fs_interval_ms", &output_fs_interval_ms, 500);
MESA_load_profile_string_def(profile, "proxy_hits", "app_name", app_name, sizeof(app_name), "proxy_rule_hits");
intercept = tfe_fieldstat_easy_intercept_create(app_name, packet_io_threads, output_fs_interval_ms, logger);
return intercept;
}
struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger)
{
struct acceptor_kni_v4 *ctx = ALLOC(struct acceptor_kni_v4, 1);
@@ -90,12 +104,8 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger)
{
goto error_out;
}
ctx->metrics = metrics_create(profile, tfe_get_kafka_handle());
if(ctx->metrics == NULL)
{
goto error_out;
}
ctx->metrics = packet_io_fieldstat_easy_create(profile, logger);
tfe_get_fieldstat_handle()->intercept = ctx->metrics;
return ctx;
@@ -104,6 +114,19 @@ error_out:
return NULL;
}
void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx)
{
if (thread_ctx == NULL)
return;
int thread_index = thread_ctx->thread_index;
struct session_table *session_table = thread_ctx->session_table;
struct fieldstat_easy_intercept *metrics=thread_ctx->ref_acceptor_ctx->metrics;
session_foreach(session_table, metrics, tfe_fieldstat_intercept_incrby, thread_index);
return;
}
static void *worker_thread_cycle(void *arg)
{
struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)arg;
@@ -131,7 +154,7 @@ static void *worker_thread_cycle(void *arg)
int timeout_ms = 0;
uint64_t current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000;
uint64_t metrics_last_send_ms = current_timestamp;
uint64_t metrics_output_interval_ms = metrics_get_interval(thread_ctx->ref_acceptor_ctx->metrics);
uint64_t metrics_output_interval_ms = tfe_fieldstat_get_output_interval(thread_ctx->ref_acceptor_ctx->metrics);
snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);