TSG-21854 TFE使用fieldstat4序列化Manipulation Policy的metric并输出到kafka

This commit is contained in:
fengweihao
2024-07-22 17:22:45 +08:00
parent 2045d517ca
commit dc1ec1dbb3
14 changed files with 135 additions and 120 deletions

View File

@@ -39,31 +39,28 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger)
return ret;
}
static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger)
static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger)
{
int cycle=0;
unsigned short telegraf_port=0;
char telegraf_ip[TFE_STRING_MAX]={0};
char app_name[TFE_STRING_MAX]={0};
struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL;
char outpath[TFE_STRING_MAX]={0};
struct tfe_fieldstat_easy_t *fieldstat_easy=NULL;
MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port));
MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip));
MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric");
MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000);
MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5);
MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_intercept_fieldstat.json");
dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger);
if (dynamic_fieldstat == NULL)
fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger);
if (fieldstat_easy == NULL)
{
TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric.");
return NULL;
}
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip);
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port);
TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath);
return dynamic_fieldstat;
return fieldstat_easy;
}
void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
@@ -72,7 +69,7 @@ void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx)
{
packet_io_destory(ctx->io);
packet_io_fs_destory(ctx->packet_io_fs);
tfe_fieldstat_metric_destroy(ctx->metric);
tfe_fieldstat_easy_destroy(ctx->metric);
free(ctx);
ctx = NULL;
}
@@ -116,7 +113,7 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger)
goto error_out;
}
ctx->metric = create_fieldstat_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger);
ctx->metric = create_fieldstat4_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger);
if(ctx->metric == NULL)
{
goto error_out;
@@ -171,7 +168,7 @@ static void *worker_thread_cycle(void *arg)
io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx);
}
TFE_LOG_INFO(logger, "worker thread %d is running", thread_index);
TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index);
while (1)
{
@@ -216,7 +213,7 @@ static void *worker_thread_cycle(void *arg)
}
error_out:
TFE_LOG_ERROR(logger, "worker thread %d exiting", thread_index);
TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", "LOG_TAG_KNI", thread_index);
return (void *)NULL;
}

View File

@@ -2061,9 +2061,7 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct
void ssl_stream_free(struct ssl_stream * s_stream, struct event_base * evbase, struct bufferevent * bev)
{
UNUSED struct ssl_shutdown_ctx * sslshutctx = NULL;
evutil_socket_t fd=-1;
fd=bufferevent_getfd(bev);
assert(fd==s_stream->_do_not_use.fd);
assert(bufferevent_getfd(bev)==s_stream->_do_not_use.fd);
unsigned long sslerr=0;
if (s_stream->dir == CONN_DIR_UPSTREAM)
@@ -2217,6 +2215,7 @@ uint64_t ssl_stream_get_policy_id(struct ssl_stream *upstream)
struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream);
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&policy_id, sizeof(policy_id), &out_size);
assert(ret == 0);
(void)ret;
return policy_id;
}
@@ -2228,6 +2227,7 @@ int ssl_stream_get_decrypted_profile_id(struct ssl_stream *upstream)
struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream);
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DECRYPTION_PROFILE_ID, (unsigned char *)&profile_id, sizeof(profile_id), &out_size);
assert(ret == 0);
(void)ret;
return profile_id;
}
@@ -2239,6 +2239,7 @@ int ssl_stream_get_trusted_keyring_profile_id(struct ssl_stream *upstream)
struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream);
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_KEYRING_FOR_TRUSTED_ID, (unsigned char *)&keyring_id, sizeof(keyring_id), &out_size);
assert(ret == 0);
(void)ret;
return keyring_id;
}
@@ -2250,6 +2251,7 @@ int ssl_stream_get_untrusted_keyring_profile_id(struct ssl_stream *upstream)
struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream);
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_KEYRING_FOR_UNTRUSTED, (unsigned char *)&keyring_id, sizeof(keyring_id), &out_size);
assert(ret == 0);
(void)ret;
return keyring_id;
}