diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index f2d9570..0240edc 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -5,7 +5,7 @@ src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp src/tfe_fieldstat.cpp src/tuple.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp src/tfe_ctrl_packet.cpp src/packet.cpp src/tfe_packet_io_fs.cpp - src/mpack.cpp src/dablooms.cpp src/murmur.cpp src/timestamp.cpp src/metrics.cpp) + src/mpack.cpp src/dablooms.cpp src/murmur.cpp src/timestamp.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/) target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal) diff --git a/common/include/kafka.h b/common/include/kafka.h index 4a242ed..814be40 100644 --- a/common/include/kafka.h +++ b/common/include/kafka.h @@ -23,7 +23,7 @@ void kafka_destroy(struct kafka *handle); // return 0: if success // return -1: if failed int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len); - +int kafka_send2(struct kafka *handle, enum topic_idx idx, const char *data, int len); #ifdef __cplusplus } #endif diff --git a/common/include/metrics.h b/common/include/metrics.h deleted file mode 100644 index 4096e59..0000000 --- a/common/include/metrics.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef _METRICS_H -#define _METRICS_H - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "kafka.h" -#include "tfe_packet_io.h" - -struct metrics *metrics_create(const char *profile, struct kafka *kfk); -void metrics_destory(struct metrics *handle); - -void metrics_single_session_output(struct session_node *node, void *ctx); -void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx); -int metrics_get_interval(struct metrics *handle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index de66f90..9e02c2c 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -9,6 +9,9 @@ extern "C" #include #include "fieldstat/fieldstat_easy.h" +#define FIELDSTAT_TAG_INIT(ptr, index, _key, _type, _value) \ + do { ptr[index].key = _key; ptr[index].type = _type; ptr[index].value_longlong = _value; } while(0) + enum metric_columns_index { COLUMN_HIT_COUNT = 0, @@ -29,18 +32,45 @@ enum metric_tags_index TAG_MAX }; -struct tfe_fieldstat_easy_t +struct fieldstat_easy_intercept { - int table_id; + int max_thread; + int hit_count_idx; + int in_bytes_idx; + int out_bytes_idx; + int in_pkts_idx; + int out_pkts_idx; + int output_fs_interval_ms; + struct fieldstat_easy *fs; +}; + +struct filedstat_easy_manipulation +{ + int table_id; int max_thread; struct fieldstat_tag **tags; int counter_array[COLUMN_MAX]; - struct fieldstat_easy *fseasy; + struct fieldstat_easy *fs; }; -struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger); +struct tfe_fieldstat_easy_t +{ + pthread_t tid; + int thr_is_runing; + int thr_need_exit; + int output_kafka_interval_ms; + struct fieldstat_easy_intercept *intercept; + struct filedstat_easy_manipulation *manipulation; +}; + +int tfe_fieldstat_get_output_interval(struct fieldstat_easy_intercept *fieldstat); +int tfe_fieldstat_intercept_incrby(struct fieldstat_easy_intercept *fieldstat, void *val_data, int thread_index); +int tfe_fieldstat_manipulation_incrby(struct filedstat_easy_manipulation *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); + +struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(int output_kafka_interval_ms); +struct fieldstat_easy_intercept *tfe_fieldstat_easy_intercept_create(char *app_name, int max_thread, int output_fs_interval_ms, void *local_logger); +struct filedstat_easy_manipulation *tfe_fieldstat_easy_manipulation_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger); void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat); -int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); #ifdef __cpluscplus } diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index 6880e5d..db6be9a 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -93,7 +93,7 @@ struct acceptor_kni_v4 struct packet_io *io; struct packet_io_fs *packet_io_fs; - struct metrics *metrics; + struct fieldstat_easy_intercept *metrics; struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX]; struct tfe_proxy *ref_proxy; diff --git a/common/include/tfe_session_table.h b/common/include/tfe_session_table.h index 7eff076..0b11622 100644 --- a/common/include/tfe_session_table.h +++ b/common/include/tfe_session_table.h @@ -51,7 +51,7 @@ int session_table_delete_by_addr(struct session_table *table, const struct tuple struct session_node *session_table_search_by_id(struct session_table *table, uint64_t session_id); struct session_node *session_table_search_by_addr(struct session_table *table, const struct tuple4 *session_addr); -void session_foreach(struct session_table *table, void (*func)(struct session_node *, void *), void *ctx); +void session_foreach(struct session_table *table, struct fieldstat_easy_intercept *metrics, int (*func)(struct fieldstat_easy_intercept *, void *, int), int thread_index); #ifdef __cpluscplus } #endif diff --git a/common/src/kafka.cpp b/common/src/kafka.cpp index a82c641..cab4642 100644 --- a/common/src/kafka.cpp +++ b/common/src/kafka.cpp @@ -216,6 +216,19 @@ void kafka_destroy(struct kafka *handle) } } +int kafka_send2(struct kafka *handle, enum topic_idx idx, const char *data, int len) +{ + if (handle && handle->pppt[idx] && handle->pppt[idx]->topic) + { + if(rd_kafka_produce(handle->pppt[idx]->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL) == -1) + { + TFE_LOG_ERROR(g_default_logger, "KAFKA: failed to produce message with topic[%d], %s", idx, rd_kafka_err2str(rd_kafka_last_error())); + return -1; + } + } + return 0; +} + int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len) { if (!handle) diff --git a/common/src/metrics.cpp b/common/src/metrics.cpp deleted file mode 100644 index 7b6ed5f..0000000 --- a/common/src/metrics.cpp +++ /dev/null @@ -1,290 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -#include "uthash.h" -#include "metrics.h" -#include "tfe_utils.h" -#include "tfe_cmsg.h" -#include "tfe_session_table.h" - -#define FIELDSTAT_TAG_INIT(ptr, index, _key, _type, _value) \ - do { \ - ptr[index].key = _key; \ - ptr[index].type = _type; \ - ptr[index].value_longlong = _value; \ - }while(0) - -struct config -{ - uint16_t thr_num; - int output_fs_interval_ms; - int output_kafka_interval_ms; - char data_center[256]; - char device_group[256]; - char device_id[256]; -}; - -struct metrics -{ - struct config cfg; - - int hit_count_idx; - int in_bytes_idx; - int out_bytes_idx; - int in_pkts_idx; - int out_pkts_idx; - - pthread_t tid; - int thr_is_runing; - int thr_need_exit; - struct kafka *kfk; - struct fieldstat_easy *fs; -}; - -/****************************************************************************** - * Private API - ******************************************************************************/ - -static void *fs2kafka_thread_cycle(void *arg) -{ - struct metrics *handle = (struct metrics *)arg; - ATOMIC_SET(&handle->thr_is_runing, 1); - - char **ptr = NULL; - size_t len = 0; - while (!ATOMIC_READ(&handle->thr_need_exit)) - { - fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); - if (ptr) - { - for (size_t i = 0; i < len; i++) - { - kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); - free(ptr[i]); - ptr[i] = NULL; - } - free(ptr); - } - - usleep(handle->cfg.output_kafka_interval_ms * 1000); - } - ATOMIC_SET(&handle->thr_is_runing, 0); - - return NULL; -} - -/****************************************************************************** - * Public API - ******************************************************************************/ - -struct metrics *metrics_create(const char *profile, struct kafka *kfk) -{ - struct metrics *handle = (struct metrics *)calloc(1, sizeof(struct metrics)); - if (!handle) - { - return NULL; - } - - MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", (int *)&(handle->cfg.thr_num), 0); - MESA_load_profile_int_def(profile, "metrics", "output_fs_interval_ms", &(handle->cfg.output_fs_interval_ms), 500); - MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); - MESA_load_profile_string_def(profile, "public", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); - MESA_load_profile_string_def(profile, "public", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); - MESA_load_profile_string_def(profile, "public", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); - - const struct fieldstat_tag tags[] = { - {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}}, - {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}}, - {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}}, - }; - - handle->kfk = kfk; - handle->fs = fieldstat_easy_new(handle->cfg.thr_num, "proxy_rule_hits", tags, sizeof(tags) / sizeof(tags[0])); - if (!handle->fs) - { - goto error_out; - } - - handle->hit_count_idx = fieldstat_easy_register_counter(handle->fs, "hit_count"); - handle->in_bytes_idx = fieldstat_easy_register_counter(handle->fs, "in_bytes"); - handle->out_bytes_idx = fieldstat_easy_register_counter(handle->fs, "out_bytes"); - handle->in_pkts_idx = fieldstat_easy_register_counter(handle->fs, "in_pkts"); - handle->out_pkts_idx = fieldstat_easy_register_counter(handle->fs, "out_pkts"); - - if (pthread_create(&handle->tid, NULL, fs2kafka_thread_cycle, (void *)handle) < 0) - { - goto error_out; - } - - return handle; - -error_out: - metrics_destory(handle); - return NULL; -} - -void metrics_destory(struct metrics *handle) -{ - if (handle) - { - ATOMIC_SET(&handle->thr_need_exit, 1); - while (ATOMIC_READ(&handle->thr_is_runing)) - { - usleep(1000); - } - - if (handle->kfk) - { - kafka_destroy(handle->kfk); - handle->kfk = NULL; - } - - if (handle->fs) - { - fieldstat_easy_free(handle->fs); - handle->fs = NULL; - } - - free(handle); - handle = NULL; - } -} - -void metrics_single_session_output(struct session_node *node, void *ctx) -{ - int ret = 0; - int hit_count = 0; - uint16_t out_size = 0; - struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx; - struct metrics *metrics = thread_ctx->ref_acceptor_ctx->metrics; - struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - struct tfe_cmsg *cmsg = s_ctx->cmsg; - int thr_idx = thread_ctx->thread_index; - if (cmsg == NULL) - return; - - int c2s_dir = s_ctx->c2s_info.is_e2i_dir; - int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; - int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; - int s2c_dir = s_ctx->s2c_info.is_e2i_dir; - int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; - int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; - s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; - s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; - - if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0) - return; - - int vsys_id = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); - return; - } - - uint64_t rule_id = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); - return; - } - - uint8_t hit_no_intercept = 0; - ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); - if (ret != 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); - return; - } - - if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag) - { - s_ctx->metric_hit = 1; - hit_count = 1; - } - - int in_pkts = 0; - int in_bytes = 0; - int out_pkts = 0; - int out_bytes = 0; - - // incoming : E2I 的流量 - // outgoing : I2E 的流量 - // first_ctr_packet_dir <==> client hello packet dir - // 1: E2I 0:I2E - if (c2s_dir == 1) - { - in_pkts += c2s_rx_pkts; - in_bytes += c2s_rx_bytes; - } - else - { - out_pkts += c2s_rx_pkts; - out_bytes += c2s_rx_bytes; - } - - if (s2c_dir == 1) - { - in_pkts += s2c_rx_pkts; - in_bytes += s2c_rx_bytes; - } - else - { - out_pkts += s2c_rx_pkts; - out_bytes += s2c_rx_bytes; - } - - int nr_tags = 0; - struct fieldstat_tag tags[5] = {0}; - FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", TAG_INTEGER, vsys_id); - nr_tags++; - FIELDSTAT_TAG_INIT(tags, nr_tags, "rule_id", TAG_INTEGER, rule_id); - nr_tags++; - uint8_t pinning_status = 0; - if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) - { - FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", TAG_INTEGER, pinning_status); - nr_tags++; - } - // action : 2 Intercept; 3 No Intercept - FIELDSTAT_TAG_INIT(tags, nr_tags, "action", TAG_INTEGER, (hit_no_intercept == 1 ? 3 : 2)); - nr_tags++; - - if (hit_count > 0) - fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count); - - if (in_pkts > 0) - fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts); - - if (in_bytes > 0) - fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes); - - if (out_pkts > 0) - fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts); - - if (out_bytes > 0) - fieldstat_easy_counter_incrby(metrics->fs, thr_idx, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes); - return; -} - -void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx) -{ - if (thread_ctx == NULL) - return; - - struct session_table *session_table = thread_ctx->session_table; - session_foreach(session_table, metrics_single_session_output, thread_ctx); - return; -} - -int metrics_get_interval(struct metrics *handle) -{ - return handle->cfg.output_fs_interval_ms; -} \ No newline at end of file diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index f83a36f..74c73d7 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -1,39 +1,248 @@ #include +#include #include #include "tfe_stream.h" #include "tfe_resource.h" #include "tfe_packet_io.h" -int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id) +int tfe_fieldstat_intercept_incrby(struct fieldstat_easy_intercept *metrics, void *val_data, int thread_index) { - return fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, counter_id, tags, (size_t)n_tags, value); + int ret = 0; + int hit_count = 0; + uint16_t out_size = 0; + + struct session_ctx *s_ctx = (struct session_ctx *)val_data; + struct tfe_cmsg *cmsg = s_ctx->cmsg; + if (cmsg == NULL) + { + return 0; + } + + int c2s_dir = s_ctx->c2s_info.is_e2i_dir; + int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; + int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; + int s2c_dir = s_ctx->s2c_info.is_e2i_dir; + int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; + int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; + s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; + s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; + + if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0) + { + return 0; + } + + int vsys_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); + return 0; + } + + uint64_t rule_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); + return 0; + } + + uint8_t hit_no_intercept = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); + return 0; + } + + if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag) + { + s_ctx->metric_hit = 1; + hit_count = 1; + } + + int in_pkts = 0; + int in_bytes = 0; + int out_pkts = 0; + int out_bytes = 0; + + // incoming : E2I + // outgoing : I2E + // first_ctr_packet_dir <==> client hello packet dir + // 1: E2I 0:I2E + if (c2s_dir == 1) + { + in_pkts += c2s_rx_pkts; + in_bytes += c2s_rx_bytes; + } + else + { + out_pkts += c2s_rx_pkts; + out_bytes += c2s_rx_bytes; + } + + if (s2c_dir == 1) + { + in_pkts += s2c_rx_pkts; + in_bytes += s2c_rx_bytes; + } + else + { + out_pkts += s2c_rx_pkts; + out_bytes += s2c_rx_bytes; + } + + int nr_tags = 0; + struct fieldstat_tag tags[5] = {0}; + FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", TAG_INTEGER, vsys_id); + nr_tags++; + FIELDSTAT_TAG_INIT(tags, nr_tags, "rule_id", TAG_INTEGER, rule_id); + nr_tags++; + uint8_t pinning_status = 0; + if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) + { + FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", TAG_INTEGER, pinning_status); + nr_tags++; + } + // action : 2 Intercept; 3 No Intercept + FIELDSTAT_TAG_INIT(tags, nr_tags, "action", TAG_INTEGER, (hit_no_intercept == 1 ? 3 : 2)); + nr_tags++; + + if (hit_count > 0) + fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count); + + if (in_pkts > 0) + fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts); + + if (in_bytes > 0) + fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes); + + if (out_pkts > 0) + fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts); + + if (out_bytes > 0) + fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes); + return 1; } -struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger) +int tfe_fieldstat_get_output_interval(struct fieldstat_easy_intercept *fieldstat) +{ + return fieldstat->output_fs_interval_ms; +} + +int tfe_fieldstat_manipulation_incrby(struct filedstat_easy_manipulation *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id) +{ + return fieldstat_easy_counter_incrby(fieldstat->fs, thread_id, counter_id, tags, (size_t)n_tags, value); +} + +static void *tfe_fieldstat_thread_cycle(void *arg) +{ + struct tfe_fieldstat_easy_t *tfe_fieldstat4_easy = (struct tfe_fieldstat_easy_t *)arg; + ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 1); + + char **ptr = NULL; + size_t len = 0; + while (!ATOMIC_READ(&tfe_fieldstat4_easy->thr_need_exit)) + { + if(tfe_fieldstat4_easy->intercept) + { + fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->intercept->fs, &ptr, &len); + if (ptr) + { + for (size_t i = 0; i < len; i++) + { + kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); + free(ptr[i]); + ptr[i] = NULL; + } + free(ptr); + } + } + + if(tfe_fieldstat4_easy->manipulation) + { + fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->manipulation->fs, &ptr, &len); + if (ptr) + { + for (size_t i = 0; i < len; i++) + { + kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); + free(ptr[i]); + ptr[i] = NULL; + } + free(ptr); + } + } + + usleep(tfe_fieldstat4_easy->output_kafka_interval_ms * 1000); + } + ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 0); + + return NULL; +} + +struct fieldstat_easy_intercept *tfe_fieldstat_easy_intercept_create(char *app_name, int max_thread, int output_fs_interval_ms, void *local_logger) +{ + struct fieldstat_easy_intercept *fieldstat = ALLOC(struct fieldstat_easy_intercept, 1); + + const struct fieldstat_tag tags[] = { + {"data_center", TAG_CSTRING, {.value_str = tfe_get_data_center()}}, + {"device_group", TAG_CSTRING, {.value_str = tfe_get_device_group()}}, + {"device_id", TAG_CSTRING, {.value_str = tfe_get_device_id()}}, + }; + + fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0])); + if (!fieldstat->fs) + { + TFE_LOG_ERROR(local_logger, "fieldstat4 easy intercept instance init failed."); + FREE(&fieldstat); + return NULL; + } + fieldstat->output_fs_interval_ms = output_fs_interval_ms; + fieldstat->hit_count_idx = fieldstat_easy_register_counter(fieldstat->fs, "hit_count"); + fieldstat->in_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_bytes"); + fieldstat->out_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_bytes"); + fieldstat->in_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_pkts"); + fieldstat->out_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_pkts"); + + return fieldstat; +} + +struct filedstat_easy_manipulation *tfe_fieldstat_easy_manipulation_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger) { const char *counter_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"}; struct fieldstat_tag metric_tags[TAG_MAX - 1] = {{"vsys_id", TAG_INTEGER, -1}, {"rule_id", TAG_INTEGER, -1}, {"action", TAG_INTEGER, -1}, {"sub_action", TAG_CSTRING, -1}}; - struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1); - - fieldstat->fseasy = fieldstat_easy_new(max_thread, app_name, NULL, 0); - if(!fieldstat->fseasy) + struct filedstat_easy_manipulation *fieldstat = ALLOC(struct filedstat_easy_manipulation, 1); + + const struct fieldstat_tag tags[] = { + {"data_center", TAG_CSTRING, {.value_str = tfe_get_data_center()}}, + {"device_group", TAG_CSTRING, {.value_str = tfe_get_device_group()}}, + {"device_id", TAG_CSTRING, {.value_str = tfe_get_device_id()}}, + }; + + fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0])); + if(!fieldstat->fs) { - TFE_LOG_ERROR(local_logger, "fieldstat4 easy instance init failed."); + TFE_LOG_ERROR(local_logger, "fieldstat4 easy manipulation instance init failed."); FREE(&fieldstat); return NULL; } - fieldstat_easy_enable_auto_output(fieldstat->fseasy, outpath, cycle); + if(cycle > 0) + { + fieldstat_easy_enable_auto_output(fieldstat->fs, outpath, cycle); + } for(int i=0; icounter_array[i]=fieldstat_easy_register_counter(fieldstat->fseasy, counter_field[i]); + fieldstat->counter_array[i]=fieldstat_easy_register_counter(fieldstat->fs, counter_field[i]); if(fieldstat->counter_array[i] < 0) { TFE_LOG_ERROR(local_logger, "fieldstat4 easy register counter failed."); FREE(&fieldstat); - return NULL; + return NULL; } } @@ -47,23 +256,48 @@ struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *out return fieldstat; } +struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(int output_kafka_interval_ms) +{ + struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1); + fieldstat->output_kafka_interval_ms = output_kafka_interval_ms; + + if (pthread_create(&fieldstat->tid, NULL, tfe_fieldstat_thread_cycle, (void *)fieldstat) < 0) + { + FREE(&fieldstat); + return NULL; + } + + return fieldstat; +} + void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat) { if(fieldstat) { - if(fieldstat->fseasy) + if(fieldstat->manipulation) { - fieldstat_easy_free(fieldstat->fseasy); + if(fieldstat->manipulation->fs) + { + fieldstat_easy_free(fieldstat->manipulation->fs); + } + + for (int i = 0; i < fieldstat->manipulation->max_thread; i++) + { + if (fieldstat->manipulation->tags[i]) + { + FREE(&fieldstat->manipulation->tags[i]); + } + } + FREE(&fieldstat->manipulation->tags); } - for (int i = 0; i < fieldstat->max_thread; i++) + if(fieldstat->intercept) { - if (fieldstat->tags[i]) + if(fieldstat->intercept->fs) { - FREE(&fieldstat->tags[i]); + fieldstat_easy_free(fieldstat->intercept->fs); } } - FREE(&fieldstat->tags); FREE(&fieldstat); } } diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 9950347..32e1250 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -35,7 +35,6 @@ #include "dablooms.h" #include "timestamp.h" #include "tfe_dp_trace.h" -#include "metrics.h" /****************************************************************************** * Struct @@ -1350,7 +1349,7 @@ static int handle_session_closing(struct metadata *meta, marsio_buff_t *rx_buff, if (node) { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - metrics_single_session_output(node, thread); + tfe_fieldstat_intercept_incrby(thread->ref_acceptor_ctx->metrics, s_ctx, thread->thread_index); TFE_LOG_INFO(logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); tfe_dp_telemetry_on_ctrl_pkt(packet_io->instance, rx_buff, s_ctx->policy_ids, meta->session_id, "closing", NULL, NULL); session_table_delete_by_id(thread->session_table, meta->session_id); diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp index e811436..3203775 100644 --- a/common/src/tfe_resource.cpp +++ b/common/src/tfe_resource.cpp @@ -11,7 +11,7 @@ #define MAAT_INPUT_FILE 2 static int scan_table_id[__SCAN_COMMON_TABLE_MAX]; -static struct tfe_fieldstat_easy_t *fieldstat_easy = NULL; +static struct tfe_fieldstat_easy_t *fieldstat4_easy = NULL; static char *device_tag=NULL; struct kafka *kafka_handle = NULL; @@ -65,30 +65,37 @@ struct maat *tfe_get_maat_handle() struct tfe_fieldstat_easy_t *tfe_get_fieldstat_handle() { - return fieldstat_easy; + return fieldstat4_easy; } -static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger) +static tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger) { int cycle=0; char app_name[TFE_STRING_MAX]={0}; char outpath[TFE_STRING_MAX]={0}; - struct tfe_fieldstat_easy_t *fieldstat_easy=NULL; + int output_fs_interval_ms=0, output_kafka_interval_ms=0; + struct tfe_fieldstat_easy_t *fieldstat_easy=NULL; - MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "proxy_rule_hits"); - MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5); - MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_fieldstat.json"); + MESA_load_profile_int_def(profile, section, "output_fs_interval_ms", &output_fs_interval_ms, 500); + MESA_load_profile_int_def(profile, section, "output_kafka_interval_ms", &output_kafka_interval_ms, 1000); - fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger); + fieldstat_easy = tfe_fieldstat_easy_create(output_kafka_interval_ms); if (fieldstat_easy == NULL) { TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric."); return NULL; } - TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); - TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); - TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath); + MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "proxy_rule_hits"); + MESA_load_profile_int_def(profile, section, "cycle", &cycle, 0); + MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_fieldstat.json"); + fieldstat_easy->manipulation = tfe_fieldstat_easy_manipulation_create(app_name, outpath, cycle, max_thread, logger); + + TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); + TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); + TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath); + TFE_LOG_INFO(logger, "tfe output_fs_interval_ms : %d", output_fs_interval_ms); + TFE_LOG_INFO(logger, "tfe output_kafka_interval_ms : %d", output_kafka_interval_ms); return fieldstat_easy; } @@ -377,8 +384,8 @@ int tfe_env_init() return -1; } - fieldstat_easy = create_fieldstat4_instance(profile_path, "proxy_hits", thread_num, g_default_logger); - if(!fieldstat_easy) + fieldstat4_easy = create_fieldstat4_instance(profile_path, "proxy_hits", thread_num, g_default_logger); + if(!fieldstat4_easy) { return -1; } diff --git a/common/src/tfe_session_table.cpp b/common/src/tfe_session_table.cpp index 237537b..d9fac5e 100644 --- a/common/src/tfe_session_table.cpp +++ b/common/src/tfe_session_table.cpp @@ -212,7 +212,7 @@ struct session_node *session_table_search_by_addr(struct session_table *table, c return temp; } -void session_foreach(struct session_table *table, void (*func)(struct session_node *, void *), void *ctx) +void session_foreach(struct session_table *table, struct fieldstat_easy_intercept *metrics, int (*func)(struct fieldstat_easy_intercept *, void *, int), int thread_index) { struct session_node *temp = NULL; struct session_node *node = NULL; @@ -222,7 +222,7 @@ void session_foreach(struct session_table *table, void (*func)(struct session_no HASH_ITER(hh1, table->root_by_id, node, temp) { - func(node, ctx); + func(metrics, node->val_data, thread_index); } return; } \ No newline at end of file diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index ddc6e7a..e1e43a9 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -27,10 +27,6 @@ data_center=center-xxg-tsgx device_group=group-xxg-tsgx device_id=9800165603247024 -[metrics] -output_fs_interval_ms=500 -output_kafka_interval_ms=1000 - # for enable kni v3 [nfq] device=tap0 @@ -222,8 +218,11 @@ full_cfg_dir=pangu_policy/full/index/ inc_cfg_dir=pangu_policy/inc/index/ [proxy_hits] -cycle=5 +cycle=0 app_name="proxy_rule_hits" +output_fs_interval_ms=500 +output_kafka_interval_ms=1000 +outpath="metrics/porxy_fieldstat.json" # for enable kni v4 [packet_io] diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index c49c90e..e94f55a 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -23,7 +23,6 @@ #include "tfe_fieldstat.h" #include "dablooms.h" #include "timestamp.h" -#include "metrics.h" void * g_packet_io_logger = NULL; @@ -47,13 +46,28 @@ void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) { packet_io_destory(ctx->io); packet_io_fs_destory(ctx->packet_io_fs); - metrics_destory(ctx->metrics); free(ctx); ctx = NULL; } return; } +struct fieldstat_easy_intercept *packet_io_fieldstat_easy_create(const char *profile, void *logger) +{ + int packet_io_threads=0; + int output_fs_interval_ms=0; + char app_name[TFE_STRING_MAX]={0}; + struct fieldstat_easy_intercept *intercept=NULL; + + MESA_load_profile_int_def(profile, "packet_io", "packet_io_threads", &packet_io_threads, 0); + MESA_load_profile_int_def(profile, "proxy_hits", "output_fs_interval_ms", &output_fs_interval_ms, 500); + MESA_load_profile_string_def(profile, "proxy_hits", "app_name", app_name, sizeof(app_name), "proxy_rule_hits"); + + intercept = tfe_fieldstat_easy_intercept_create(app_name, packet_io_threads, output_fs_interval_ms, logger); + + return intercept; +} + struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) { struct acceptor_kni_v4 *ctx = ALLOC(struct acceptor_kni_v4, 1); @@ -90,12 +104,8 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) { goto error_out; } - - ctx->metrics = metrics_create(profile, tfe_get_kafka_handle()); - if(ctx->metrics == NULL) - { - goto error_out; - } + ctx->metrics = packet_io_fieldstat_easy_create(profile, logger); + tfe_get_fieldstat_handle()->intercept = ctx->metrics; return ctx; @@ -104,6 +114,19 @@ error_out: return NULL; } +void metrics_all_session_output(struct packet_io_thread_ctx *thread_ctx) +{ + if (thread_ctx == NULL) + return; + + int thread_index = thread_ctx->thread_index; + + struct session_table *session_table = thread_ctx->session_table; + struct fieldstat_easy_intercept *metrics=thread_ctx->ref_acceptor_ctx->metrics; + session_foreach(session_table, metrics, tfe_fieldstat_intercept_incrby, thread_index); + return; +} + static void *worker_thread_cycle(void *arg) { struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)arg; @@ -131,7 +154,7 @@ static void *worker_thread_cycle(void *arg) int timeout_ms = 0; uint64_t current_timestamp = current_time.tv_sec * 1000 + current_time.tv_nsec / 1000000; uint64_t metrics_last_send_ms = current_timestamp; - uint64_t metrics_output_interval_ms = metrics_get_interval(thread_ctx->ref_acceptor_ctx->metrics); + uint64_t metrics_output_interval_ms = tfe_fieldstat_get_output_interval(thread_ctx->ref_acceptor_ctx->metrics); snprintf(thread_name, sizeof(thread_name), "pkt:worker-%d", thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index ca99c44..f51e9ce 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -824,7 +824,12 @@ int doh_on_data(const struct tfe_stream *stream, const struct tfe_http_session * void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, unsigned int thread_id) { size_t c2s_byte_num = 0, s2c_byte_num =0; - struct tfe_fieldstat_easy_t *fieldstat = tfe_get_fieldstat_handle(); + + struct filedstat_easy_manipulation *fieldstat = tfe_get_fieldstat_handle()->manipulation; + if(fieldstat == NULL) + { + return; + } fieldstat->tags[thread_id][TAG_VSYS_ID].value_longlong = ctx->result->vsys_id; fieldstat->tags[thread_id][TAG_RULE_ID].value_longlong = ctx->result->config_id; @@ -860,23 +865,10 @@ void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, out_bytes = c2s_byte_num; } - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - char **payload = NULL; - size_t payload_len = 0; - - fieldstat_easy_output_array(fieldstat->fseasy, &payload, &payload_len); - if (payload) - { - for (size_t i = 0; i < payload_len; i++) - { - kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, payload[i], strlen(payload[i])); - FREE(&payload[i]); - } - FREE(&payload); - } return; } diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index 47dc44b..9a684ad 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -1404,7 +1404,12 @@ void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_c proxy_action_map[PX_ACTION_REJECT]="deny"; proxy_action_map[PX_ACTION_WHITELIST]="allow"; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; - struct tfe_fieldstat_easy_t *fieldstat = tfe_get_fieldstat_handle(); + + struct filedstat_easy_manipulation *fieldstat = tfe_get_fieldstat_handle()->manipulation; + if(fieldstat == NULL) + { + return; + } for(i=0; i< ctx->n_enforce; i++) { @@ -1463,25 +1468,12 @@ void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_c in_bytes=0; out_bytes=0; } - - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], hit_cnt, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], hit_cnt, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_manipulation_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); } - char **payload = NULL; - size_t payload_len = 0; - - fieldstat_easy_output_array(fieldstat->fseasy, &payload, &payload_len); - if (payload) - { - for (size_t i = 0; i < payload_len; i++) - { - kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, payload[i], strlen(payload[i])); - FREE(&payload[i]); - } - FREE(&payload); - } return; } diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp index 348e011..b3a324b 100644 --- a/plugin/business/tsg-http/src/tsg_logger.cpp +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -90,7 +90,7 @@ size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct e { TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); } - kafka_send(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size); + kafka_send2(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size); free(mpack_data); mpack_data = NULL;