diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 0240edc..f2d9570 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -5,7 +5,7 @@ src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp src/tfe_fieldstat.cpp src/tuple.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp src/tfe_ctrl_packet.cpp src/packet.cpp src/tfe_packet_io_fs.cpp - src/mpack.cpp src/dablooms.cpp src/murmur.cpp src/timestamp.cpp) + src/mpack.cpp src/dablooms.cpp src/murmur.cpp src/timestamp.cpp src/metrics.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/) target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal) diff --git a/common/include/metrics.h b/common/include/metrics.h new file mode 100644 index 0000000..4096e59 --- /dev/null +++ b/common/include/metrics.h @@ -0,0 +1,23 @@ +#ifndef _METRICS_H +#define _METRICS_H + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "kafka.h" +#include "tfe_packet_io.h" + +struct metrics *metrics_create(const char *profile, struct kafka *kfk); +void metrics_destory(struct metrics *handle); + +void metrics_single_session_output(struct session_node *node, void *ctx); +void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx); +int metrics_get_interval(struct metrics *handle); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index d22f741..de66f90 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -39,7 +39,6 @@ struct tfe_fieldstat_easy_t }; struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger); -void tfe_set_intercept_metric(struct tfe_fieldstat_easy_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close); void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat); int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index a1ce898..6880e5d 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -63,11 +63,11 @@ struct session_ctx uint8_t is_passthrough; uint8_t protocol; uint8_t metric_hit; + uint8_t send_log_flag; char session_addr[128]; struct packet_info c2s_info; struct packet_info s2c_info; - struct timespec metrics_last_time; struct metadata *ctrl_meta; @@ -93,7 +93,7 @@ struct acceptor_kni_v4 struct packet_io *io; struct packet_io_fs *packet_io_fs; - struct tfe_fieldstat_easy_t *metric; + struct metrics *metrics; struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX]; struct tfe_proxy *ref_proxy; diff --git a/common/include/tfe_session_table.h b/common/include/tfe_session_table.h index 5fa3b78..7eff076 100644 --- a/common/include/tfe_session_table.h +++ b/common/include/tfe_session_table.h @@ -51,6 +51,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct tuple struct session_node *session_table_search_by_id(struct session_table *table, uint64_t session_id); struct session_node *session_table_search_by_addr(struct session_table *table, const struct tuple4 *session_addr); +void session_foreach(struct session_table *table, void (*func)(struct session_node *, void *), void *ctx); #ifdef __cpluscplus } #endif diff --git a/common/src/metrics.cpp b/common/src/metrics.cpp new file mode 100644 index 0000000..7b6ed5f --- /dev/null +++ b/common/src/metrics.cpp @@ -0,0 +1,290 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "uthash.h" +#include "metrics.h" +#include "tfe_utils.h" +#include "tfe_cmsg.h" +#include "tfe_session_table.h" + +#define FIELDSTAT_TAG_INIT(ptr, index, _key, _type, _value) \ + do { \ + ptr[index].key = _key; \ + ptr[index].type = _type; \ + ptr[index].value_longlong = _value; \ + }while(0) + +struct config +{ + uint16_t thr_num; + int output_fs_interval_ms; + int output_kafka_interval_ms; + char data_center[256]; + char device_group[256]; + char device_id[256]; +}; + +struct metrics +{ + struct config cfg; + + int hit_count_idx; + int in_bytes_idx; + int out_bytes_idx; + int in_pkts_idx; + int out_pkts_idx; + + pthread_t tid; + int thr_is_runing; + int thr_need_exit; + struct kafka *kfk; + struct fieldstat_easy *fs; +}; + +/****************************************************************************** + * Private API + ******************************************************************************/ + +static void *fs2kafka_thread_cycle(void *arg) +{ + struct metrics *handle = (struct metrics *)arg; + ATOMIC_SET(&handle->thr_is_runing, 1); + + char **ptr = NULL; + size_t len = 0; + while (!ATOMIC_READ(&handle->thr_need_exit)) + { + fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); + if (ptr) + { + for (size_t i = 0; i < len; i++) + { + kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); + free(ptr[i]); + ptr[i] = NULL; + } + free(ptr); + } + + usleep(handle->cfg.output_kafka_interval_ms * 1000); + } + ATOMIC_SET(&handle->thr_is_runing, 0); + + return NULL; +} + +/****************************************************************************** + * Public API + ******************************************************************************/ + +struct metrics *metrics_create(const char *profile, struct kafka *kfk) +{ + struct metrics *handle = (struct metrics *)calloc(1, sizeof(struct metrics)); + if (!handle) + { + return NULL; + } + + MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", (int *)&(handle->cfg.thr_num), 0); + MESA_load_profile_int_def(profile, "metrics", "output_fs_interval_ms", &(handle->cfg.output_fs_interval_ms), 500); + MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); + MESA_load_profile_string_def(profile, "public", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); + MESA_load_profile_string_def(profile, "public", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); + MESA_load_profile_string_def(profile, "public", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); + + const struct fieldstat_tag tags[] = { + {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}}, + {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}}, + {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}}, + }; + + handle->kfk = kfk; + handle->fs = fieldstat_easy_new(handle->cfg.thr_num, "proxy_rule_hits", tags, sizeof(tags) / sizeof(tags[0])); + if (!handle->fs) + { + goto error_out; + } + + handle->hit_count_idx = fieldstat_easy_register_counter(handle->fs, "hit_count"); + handle->in_bytes_idx = fieldstat_easy_register_counter(handle->fs, "in_bytes"); + handle->out_bytes_idx = fieldstat_easy_register_counter(handle->fs, "out_bytes"); + handle->in_pkts_idx = fieldstat_easy_register_counter(handle->fs, "in_pkts"); + handle->out_pkts_idx = fieldstat_easy_register_counter(handle->fs, "out_pkts"); + + if (pthread_create(&handle->tid, NULL, fs2kafka_thread_cycle, (void *)handle) < 0) + { + goto error_out; + } + + return handle; + +error_out: + metrics_destory(handle); + return NULL; +} + +void metrics_destory(struct metrics *handle) +{ + if (handle) + { + ATOMIC_SET(&handle->thr_need_exit, 1); + while (ATOMIC_READ(&handle->thr_is_runing)) + { + usleep(1000); + } + + if (handle->kfk) + { + kafka_destroy(handle->kfk); + handle->kfk = NULL; + } + + if (handle->fs) + { + fieldstat_easy_free(handle->fs); + handle->fs = NULL; + } + + free(handle); + handle = NULL; + } +} + +void metrics_single_session_output(struct session_node *node, void *ctx) +{ + int ret = 0; + int hit_count = 0; + uint16_t out_size = 0; + struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx; + struct metrics *metrics = thread_ctx->ref_acceptor_ctx->metrics; + struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; + struct tfe_cmsg *cmsg = s_ctx->cmsg; + int thr_idx = thread_ctx->thread_index; + if (cmsg == NULL) + return; + + int c2s_dir = s_ctx->c2s_info.is_e2i_dir; + int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; + int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; + int s2c_dir = s_ctx->s2c_info.is_e2i_dir; + int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; + int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; + s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; + s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; + + if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0) + return; + + int vsys_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); + return; + } + + uint64_t rule_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); + return; + } + + uint8_t hit_no_intercept = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); + return; + } + + if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag) + { + s_ctx->metric_hit = 1; + hit_count = 1; + } + + int in_pkts = 0; + int in_bytes = 0; + int out_pkts = 0; + int out_bytes = 0; + + // incoming : E2I 的流量 + // outgoing : I2E 的流量 + // first_ctr_packet_dir <==> client hello packet dir + // 1: E2I 0:I2E + if (c2s_dir == 1) + { + in_pkts += c2s_rx_pkts; + in_bytes += c2s_rx_bytes; + } + else + { + out_pkts += c2s_rx_pkts; + out_bytes += c2s_rx_bytes; + } + + if (s2c_dir == 1) + { + in_pkts += s2c_rx_pkts; + in_bytes += s2c_rx_bytes; + } + else + { + out_pkts += s2c_rx_pkts; + out_bytes += s2c_rx_bytes; + } + + int nr_tags = 0; + struct fieldstat_tag tags[5] = {0}; + FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", TAG_INTEGER, vsys_id); + nr_tags++; + FIELDSTAT_TAG_INIT(tags, nr_tags, "rule_id", TAG_INTEGER, rule_id); + nr_tags++; + uint8_t pinning_status = 0; + if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) + { + FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", TAG_INTEGER, pinning_status); + nr_tags++; + } + // action : 2 Intercept; 3 No Intercept + FIELDSTAT_TAG_INIT(tags, nr_tags, "action", TAG_INTEGER, (hit_no_intercept == 1 ? 3 : 2)); + nr_tags++; + + if (hit_count > 0) + fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count); + + if (in_pkts > 0) + fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts); + + if (in_bytes > 0) + fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes); + + if (out_pkts > 0) + fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts); + + if (out_bytes > 0) + fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes); + return; +} + +void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx) +{ + if (thread_ctx == NULL) + return; + + struct session_table *session_table = thread_ctx->session_table; + session_foreach(session_table, metrics_single_session_output, thread_ctx); + return; +} + +int metrics_get_interval(struct metrics *handle) +{ + return handle->cfg.output_fs_interval_ms; +} \ No newline at end of file diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index f38bced..f83a36f 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -5,159 +5,6 @@ #include "tfe_resource.h" #include "tfe_packet_io.h" -void tfe_set_intercept_metric(struct tfe_fieldstat_easy_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close) -{ - int ret; - int hit_count = 0; - uint16_t out_size; - struct tfe_cmsg *cmsg = s_ctx->cmsg; - struct timespec current_time; - - if (cmsg == NULL) - { - return; - } - - if (s_ctx->metric_hit == 0) { - int flag = 0; - flag = tfe_cmsg_get_flag(cmsg); - if ((flag & TFE_CMSG_FLAG_USER0) == 0) { - return; - } - s_ctx->metric_hit = 1; - hit_count = 1; - } - - if (!is_session_close) - { - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - if (current_time.tv_sec - s_ctx->metrics_last_time.tv_sec < 1) - { - return; - } - } - - s_ctx->metrics_last_time = current_time; - int downstream_dir = s_ctx->c2s_info.is_e2i_dir; - int downstream_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; - int downstream_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; - int upstream_dir = s_ctx->s2c_info.is_e2i_dir; - int upstream_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; - int upstream_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; - s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; - s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; - - int vsys_id = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); - return; - } - - uint64_t rule_id = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); - return; - } - - uint8_t hit_no_intercept = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); - return; - } - - int in_pkts = 0; - int in_bytes = 0; - int out_pkts = 0; - int out_bytes = 0; - - // incoming : E2I 的流量 - // outgoing : I2E 的流量 - // first_ctr_packet_dir <==> client hello packet dir - // 1: E2I 0:I2E - if (downstream_dir == 1) - { - in_pkts += downstream_rx_pkts; - in_bytes += downstream_rx_bytes; - } - else - { - out_pkts += downstream_rx_pkts; - out_bytes += downstream_rx_bytes; - } - - if (upstream_dir == 1) - { - in_pkts += upstream_rx_pkts; - in_bytes += upstream_rx_bytes; - } - else - { - out_pkts += upstream_rx_pkts; - out_bytes += upstream_rx_bytes; - } - - int nr_tags = 0; - struct fieldstat_tag temp_tags[TAG_MAX] = {0}; - - temp_tags[nr_tags].key = "vsys_id"; - temp_tags[nr_tags].type = TAG_INTEGER; - temp_tags[nr_tags].value_longlong = vsys_id; - nr_tags++; - - temp_tags[nr_tags].key = "rule_id"; - temp_tags[nr_tags].type = TAG_INTEGER; - temp_tags[nr_tags].value_longlong = rule_id; - nr_tags++; - - uint8_t pinning_status = 0; - if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) - { - temp_tags[nr_tags].key = "pinning_status"; - temp_tags[nr_tags].type = TAG_INTEGER; - temp_tags[nr_tags].value_longlong = pinning_status; - nr_tags++; - } - - // action : 2 Intercept; 3 No Intercept - temp_tags[nr_tags].key = "action"; - temp_tags[nr_tags].type = TAG_INTEGER; - temp_tags[nr_tags].value_longlong = (hit_no_intercept == 1 ? 3 : 2); - nr_tags++; - - // sub_action not need for intercept metrics - - if (hit_count > 0) - { - fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_HIT_COUNT], temp_tags, (size_t)nr_tags, hit_count); - } - - if (in_pkts > 0) - { - fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_IN_PKTS], temp_tags, (size_t)nr_tags, in_pkts); - } - - if (in_bytes > 0) - { - fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_IN_BYTES], temp_tags, (size_t)nr_tags, in_bytes); - } - - if (out_pkts > 0) - { - fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_OUT_PKTS], temp_tags, (size_t)nr_tags, out_pkts); - } - - if (out_bytes > 0) - { - fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_OUT_BYTES], temp_tags, (size_t)nr_tags, out_bytes); - } -} - int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id) { return fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, counter_id, tags, (size_t)n_tags, value); diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index f3525dc..9950347 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -35,6 +35,7 @@ #include "dablooms.h" #include "timestamp.h" #include "tfe_dp_trace.h" +#include "metrics.h" /****************************************************************************** * Struct @@ -1343,16 +1344,15 @@ static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff, struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; - struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; void * logger = thread->logger; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (node) { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; + metrics_single_session_output(node, thread); TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, meta->session_id, "closing", NULL, NULL); - tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 1); session_table_delete_by_id(thread->session_table, meta->session_id); ATOMIC_DEC(&(packet_io_fs->session_num)); return 0; @@ -1444,7 +1444,6 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; - struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct packet pkt; @@ -1521,9 +1520,9 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); } - tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0); flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { + s_ctx->send_log_flag = 1; send_event_log(s_ctx, thread_seq, ctx); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); } @@ -1577,7 +1576,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx is_ipv4 = s_ctx->s2c_info.is_ipv4; throughput_metrics_inc(&s_ctx->s2c_info.rx, 1, raw_len); } - tfe_set_intercept_metric(acceptor_ctx->metric, s_ctx, thread_seq, 0); if (header != NULL) { char *packet_buff = NULL; @@ -1614,6 +1612,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { + s_ctx->send_log_flag = 1; send_event_log(s_ctx, thread_seq, ctx); tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); } diff --git a/common/src/tfe_session_table.cpp b/common/src/tfe_session_table.cpp index 46df666..237537b 100644 --- a/common/src/tfe_session_table.cpp +++ b/common/src/tfe_session_table.cpp @@ -211,3 +211,18 @@ struct session_node *session_table_search_by_addr(struct session_table *table, c return temp; } + +void session_foreach(struct session_table *table, void (*func)(struct session_node *, void *), void *ctx) +{ + struct session_node *temp = NULL; + struct session_node *node = NULL; + + if (!func) + return; + + HASH_ITER(hh1, table->root_by_id, node, temp) + { + func(node, ctx); + } + return; +} \ No newline at end of file diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index d4b17a9..ddc6e7a 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -27,6 +27,10 @@ data_center=center-xxg-tsgx device_group=group-xxg-tsgx device_id=9800165603247024 +[metrics] +output_fs_interval_ms=500 +output_kafka_interval_ms=1000 + # for enable kni v3 [nfq] device=tap0 diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index b4e25e1..c49c90e 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include "io_uring.h" #include "tfe_packet_io_fs.h" #include "tfe_tcp_restore.h" @@ -22,6 +23,7 @@ #include "tfe_fieldstat.h" #include "dablooms.h" #include "timestamp.h" +#include "metrics.h" void * g_packet_io_logger = NULL; @@ -39,37 +41,13 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger) return ret; } -static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger) -{ - int cycle=0; - char app_name[TFE_STRING_MAX]={0}; - char outpath[TFE_STRING_MAX]={0}; - struct tfe_fieldstat_easy_t *fieldstat_easy=NULL; - - MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric"); - MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5); - MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_intercept_fieldstat.json"); - - fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger); - if (fieldstat_easy == NULL) - { - TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric."); - return NULL; - } - TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); - TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); - TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath); - - return fieldstat_easy; -} - void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) { if (ctx) { packet_io_destory(ctx->io); packet_io_fs_destory(ctx->packet_io_fs); - tfe_fieldstat_easy_destroy(ctx->metric); + metrics_destory(ctx->metrics); free(ctx); ctx = NULL; } @@ -113,8 +91,8 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) goto error_out; } - ctx->metric = create_fieldstat4_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger); - if(ctx->metric == NULL) + ctx->metrics = metrics_create(profile, tfe_get_kafka_handle()); + if(ctx->metrics == NULL) { goto error_out; } @@ -148,6 +126,13 @@ static void *worker_thread_cycle(void *arg) struct io_uring_instance *io_uring_on_tap_c = thread_ctx->tap_ctx->io_uring_c; struct io_uring_instance *io_uring_on_tap_s = thread_ctx->tap_ctx->io_uring_s; + struct timespec current_time; + clock_gettime(CLOCK_MONOTONIC, ¤t_time); + int timeout_ms = 0; + uint64_t current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000; + uint64_t metrics_last_send_ms = current_timestamp; + uint64_t metrics_output_interval_ms = metrics_get_interval(thread_ctx->ref_acceptor_ctx->metrics); + snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); @@ -200,9 +185,16 @@ static void *worker_thread_cycle(void *arg) } } + clock_gettime(CLOCK_MONOTONIC, ¤t_time); + current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000; if (n_pkt_recv == 0) { - packet_io_thread_wait(handle, thread_ctx, -1); + timeout_ms = metrics_last_send_ms + metrics_output_interval_ms - current_timestamp; + if (timeout_ms <= 0) + { + timeout_ms = 0; + } + packet_io_thread_wait(handle, thread_ctx, timeout_ms); } if (ATOMIC_READ(&thread_ctx->session_table_need_reset) > 0) @@ -210,6 +202,12 @@ static void *worker_thread_cycle(void *arg) session_table_reset(thread_ctx->session_table); ATOMIC_ZERO(&thread_ctx->session_table_need_reset); } + + if (current_timestamp - metrics_last_send_ms >= metrics_output_interval_ms) + { + metrics_all_session_output(thread_ctx); + metrics_last_send_ms = current_timestamp; + } } error_out: