TSG-21855 使用fieldstat4序列化Intercept Policy的metric并输出到kafka

This commit is contained in:
wangmenglan
2024-07-24 10:01:47 +08:00
parent dc1ec1dbb3
commit a59b939033
11 changed files with 366 additions and 190 deletions

View File

@@ -12,6 +12,7 @@
#include <tfe_utils.h>
#include <tfe_cmsg.h>
#include <proxy.h>
#include <tfe_resource.h>
#include "io_uring.h"
#include "tfe_packet_io_fs.h"
#include "tfe_tcp_restore.h"
@@ -22,6 +23,7 @@
#include "tfe_fieldstat.h"
#include "dablooms.h"
#include "timestamp.h"
#include "metrics.h"
void * g_packet_io_logger = NULL;
@@ -39,37 +41,13 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger)
return ret;
}
static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger)
{
int cycle=0;
char app_name[TFE_STRING_MAX]={0};
char outpath[TFE_STRING_MAX]={0};
struct tfe_fieldstat_easy_t *fieldstat_easy=NULL;
MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric");
MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5);
MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_intercept_fieldstat.json");
fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger);
if (fieldstat_easy == NULL)
{
TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric.");
return NULL;
}
TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath);
return fieldstat_easy;
}
void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
{
if (ctx)
{
packet_io_destory(ctx->io);
packet_io_fs_destory(ctx->packet_io_fs);
tfe_fieldstat_easy_destroy(ctx->metric);
metrics_destory(ctx->metrics);
free(ctx);
ctx = NULL;
}
@@ -113,8 +91,8 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger)
goto error_out;
}
ctx->metric = create_fieldstat4_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger);
if(ctx->metric == NULL)
ctx->metrics = metrics_create(profile, tfe_get_kafka_handle());
if(ctx->metrics == NULL)
{
goto error_out;
}
@@ -148,6 +126,13 @@ static void *worker_thread_cycle(void *arg)
struct io_uring_instance *io_uring_on_tap_c = thread_ctx->tap_ctx->io_uring_c;
struct io_uring_instance *io_uring_on_tap_s = thread_ctx->tap_ctx->io_uring_s;
struct timespec current_time;
clock_gettime(CLOCK_MONOTONIC, &current_time);
int timeout_ms = 0;
uint64_t current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000;
uint64_t metrics_last_send_ms = current_timestamp;
uint64_t metrics_output_interval_ms = metrics_get_interval(thread_ctx->ref_acceptor_ctx->metrics);
snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
@@ -200,9 +185,16 @@ static void *worker_thread_cycle(void *arg)
}
}
clock_gettime(CLOCK_MONOTONIC, &current_time);
current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000;
if (n_pkt_recv == 0)
{
packet_io_thread_wait(handle, thread_ctx, -1);
timeout_ms = metrics_last_send_ms + metrics_output_interval_ms - current_timestamp;
if (timeout_ms <= 0)
{
timeout_ms = 0;
}
packet_io_thread_wait(handle, thread_ctx, timeout_ms);
}
if (ATOMIC_READ(&thread_ctx->session_table_need_reset) > 0)
@@ -210,6 +202,12 @@ static void *worker_thread_cycle(void *arg)
session_table_reset(thread_ctx->session_table);
ATOMIC_ZERO(&thread_ctx->session_table_need_reset);
}
if (current_timestamp - metrics_last_send_ms >= metrics_output_interval_ms)
{
metrics_all_session_output(thread_ctx);
metrics_last_send_ms = current_timestamp;
}
}
error_out: