From dc1ec1dbb3eeb9589ace653dcffba72b9fbfd284 Mon Sep 17 00:00:00 2001 From: fengweihao Date: Mon, 22 Jul 2024 17:22:45 +0800 Subject: [PATCH] =?UTF-8?q?TSG-21854=20TFE=E4=BD=BF=E7=94=A8fieldstat4?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96Manipulation=20Policy=E7=9A=84metric?= =?UTF-8?q?=E5=B9=B6=E8=BE=93=E5=87=BA=E5=88=B0kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci/travis.sh | 2 +- common/CMakeLists.txt | 2 +- common/include/tfe_fieldstat.h | 16 ++-- common/include/tfe_packet_io.h | 2 +- common/include/tfe_resource.h | 2 +- common/src/tfe_fieldstat.cpp | 90 +++++++++++------------ common/src/tfe_resource.cpp | 35 ++++----- conf/tfe/tfe.conf | 5 +- platform/CMakeLists.txt | 2 +- platform/src/acceptor_kni_v4.cpp | 29 ++++---- platform/src/ssl_stream.cpp | 8 +- plugin/business/doh/src/doh.cpp | 28 +++++-- plugin/business/tsg-http/src/tsg_http.cpp | 28 +++++-- vendor/CMakeLists.txt | 6 +- 14 files changed, 135 insertions(+), 120 deletions(-) diff --git a/ci/travis.sh b/ci/travis.sh index 0628e27..7575f85 100644 --- a/ci/travis.sh +++ b/ci/travis.sh @@ -34,7 +34,7 @@ env | sort # Install dependency from YUM yum install -y mrzcpd-corei7-4.* numactl-devel zlib-devel librdkafka-devel-1.2.2.1218b3c-1.el8.x86_64 librdkafka-1.2.2.1218b3c-1.el8.x86_64 systemd-devel -yum install -y libcjson-devel libmaatframe-devel libMESA_field_stat2-devel libfieldstat3-devel libfieldstat4-devel libMESA_handle_logger-devel libelua-devel +yum install -y libcjson-devel libmaatframe-devel libMESA_field_stat2-devel libfieldstat4-devel libMESA_handle_logger-devel libelua-devel yum install -y libMESA_htable-devel libMESA_prof_load-devel libwiredcfg-devel libWiredLB-devel sapp-devel libbreakpad_mini-devel yum install -y libasan yum install -y numactl-libs # required by mrzcpd diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 0aa8dfc..0240edc 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -10,7 +10,7 @@ target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/) target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal) target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads rdkafka) -target_link_libraries(common PUBLIC MESA_handle_logger cjson bpf_obj mrzcpd MESA_prof_load maatframe fieldstat3) +target_link_libraries(common PUBLIC MESA_handle_logger cjson bpf_obj mrzcpd MESA_prof_load maatframe fieldstat4) target_link_libraries(common PUBLIC pthread) if (SUPPORT_LIBURING) diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index 878f14f..d22f741 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -7,7 +7,7 @@ extern "C" #endif #include -#include +#include "fieldstat/fieldstat_easy.h" enum metric_columns_index { @@ -29,19 +29,19 @@ enum metric_tags_index TAG_MAX }; -struct tfe_fieldstat_metric_t +struct tfe_fieldstat_easy_t { int table_id; int max_thread; struct fieldstat_tag **tags; - unsigned int column_array[COLUMN_MAX]; - struct fieldstat_dynamic_instance *instance; + int counter_array[COLUMN_MAX]; + struct fieldstat_easy *fseasy; }; -void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close); -int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); -struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger); -void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat); +struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger); +void tfe_set_intercept_metric(struct tfe_fieldstat_easy_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close); +void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat); +int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id); #ifdef __cpluscplus } diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index b2427e5..a1ce898 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -93,7 +93,7 @@ struct acceptor_kni_v4 struct packet_io *io; struct packet_io_fs *packet_io_fs; - struct tfe_fieldstat_metric_t *metric; + struct tfe_fieldstat_easy_t *metric; struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX]; struct tfe_proxy *ref_proxy; diff --git a/common/include/tfe_resource.h b/common/include/tfe_resource.h index a2f321e..fff74d7 100644 --- a/common/include/tfe_resource.h +++ b/common/include/tfe_resource.h @@ -48,4 +48,4 @@ const char *tfe_get_device_tag(); const char *tfe_get_sled_ip(); struct kafka *tfe_get_kafka_handle(); struct maat *tfe_get_maat_handle(); -struct tfe_fieldstat_metric_t *tfe_get_fieldstat_handle(); \ No newline at end of file +struct tfe_fieldstat_easy_t *tfe_get_fieldstat_handle(); \ No newline at end of file diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index 7527a13..f38bced 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -5,7 +5,7 @@ #include "tfe_resource.h" #include "tfe_packet_io.h" -void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close) +void tfe_set_intercept_metric(struct tfe_fieldstat_easy_t *fieldstat, struct session_ctx *s_ctx, int thread_id, int is_session_close) { int ret; int hit_count = 0; @@ -106,110 +106,107 @@ void tfe_set_intercept_metric(struct tfe_fieldstat_metric_t *fieldstat, struct s struct fieldstat_tag temp_tags[TAG_MAX] = {0}; temp_tags[nr_tags].key = "vsys_id"; - temp_tags[nr_tags].value_type = 0; - temp_tags[nr_tags].value_int = vsys_id; + temp_tags[nr_tags].type = TAG_INTEGER; + temp_tags[nr_tags].value_longlong = vsys_id; nr_tags++; temp_tags[nr_tags].key = "rule_id"; - temp_tags[nr_tags].value_type = 0; - temp_tags[nr_tags].value_int = rule_id; + temp_tags[nr_tags].type = TAG_INTEGER; + temp_tags[nr_tags].value_longlong = rule_id; nr_tags++; uint8_t pinning_status = 0; if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) { temp_tags[nr_tags].key = "pinning_status"; - temp_tags[nr_tags].value_type = 0; - temp_tags[nr_tags].value_int = pinning_status; + temp_tags[nr_tags].type = TAG_INTEGER; + temp_tags[nr_tags].value_longlong = pinning_status; nr_tags++; } // action : 2 Intercept; 3 No Intercept temp_tags[nr_tags].key = "action"; - temp_tags[nr_tags].value_type = 0; - temp_tags[nr_tags].value_int = (hit_no_intercept == 1 ? 3 : 2); + temp_tags[nr_tags].type = TAG_INTEGER; + temp_tags[nr_tags].value_longlong = (hit_no_intercept == 1 ? 3 : 2); nr_tags++; // sub_action not need for intercept metrics if (hit_count > 0) { - fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_HIT_COUNT], "proxy_rule_hits", hit_count, temp_tags, (size_t)nr_tags, thread_id); + fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_HIT_COUNT], temp_tags, (size_t)nr_tags, hit_count); } if (in_pkts > 0) { - fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_IN_PKTS], "proxy_rule_hits", in_pkts, temp_tags, (size_t)nr_tags, thread_id); + fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_IN_PKTS], temp_tags, (size_t)nr_tags, in_pkts); } if (in_bytes > 0) { - fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_IN_BYTES], "proxy_rule_hits", in_bytes, temp_tags, (size_t)nr_tags, thread_id); + fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_IN_BYTES], temp_tags, (size_t)nr_tags, in_bytes); } if (out_pkts > 0) { - fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_OUT_PKTS], "proxy_rule_hits", out_pkts, temp_tags, (size_t)nr_tags, thread_id); + fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_OUT_PKTS], temp_tags, (size_t)nr_tags, out_pkts); } if (out_bytes > 0) { - fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_OUT_BYTES], "proxy_rule_hits", out_bytes, temp_tags, (size_t)nr_tags, thread_id); + fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, fieldstat->counter_array[COLUMN_OUT_BYTES], temp_tags, (size_t)nr_tags, out_bytes); } } -int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id) +int tfe_fieldstat_easy_incrby(struct tfe_fieldstat_easy_t *fieldstat, unsigned int counter_id, long long value, const struct fieldstat_tag tags[], int n_tags, int thread_id) { - return fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, column_id, "proxy_rule_hits", value, tags, (size_t)n_tags, thread_id); + return fieldstat_easy_counter_incrby(fieldstat->fseasy, thread_id, counter_id, tags, (size_t)n_tags, value); } -struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger) +struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger) { - int i=0; + const char *counter_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"}; + struct fieldstat_tag metric_tags[TAG_MAX - 1] = {{"vsys_id", TAG_INTEGER, -1}, {"rule_id", TAG_INTEGER, -1}, {"action", TAG_INTEGER, -1}, {"sub_action", TAG_CSTRING, -1}}; - struct fieldstat_tag metric_tags[TAG_MAX - 1] = {{"vsys_id", 0, -1}, {"rule_id", 0, -1}, {"action", 0, -1}, {"sub_action", 2, -1}}; - const char *column_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"}; - enum field_type column_type[COLUMN_MAX] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER}; - - struct tfe_fieldstat_metric_t *fieldstat = ALLOC(struct tfe_fieldstat_metric_t, 1); - - fieldstat->instance = fieldstat_dynamic_instance_new(app_name, max_thread); - if(!fieldstat->instance) + struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1); + + fieldstat->fseasy = fieldstat_easy_new(max_thread, app_name, NULL, 0); + if(!fieldstat->fseasy) { - TFE_LOG_ERROR(local_logger, "fieldstat3 dynamic instance init failed."); + TFE_LOG_ERROR(local_logger, "fieldstat4 easy instance init failed."); + FREE(&fieldstat); return NULL; } + fieldstat_easy_enable_auto_output(fieldstat->fseasy, outpath, cycle); - fieldstat->max_thread=max_thread; - fieldstat_dynamic_set_line_protocol_server(fieldstat->instance, telegraf_ip, telegraf_port); - fieldstat_dynamic_set_output_interval(fieldstat->instance, cycle); + for(int i=0; icounter_array[i]=fieldstat_easy_register_counter(fieldstat->fseasy, counter_field[i]); + if(fieldstat->counter_array[i] < 0) + { + TFE_LOG_ERROR(local_logger, "fieldstat4 easy register counter failed."); + FREE(&fieldstat); + return NULL; + } + } - fieldstat->table_id = fieldstat_register_dynamic_table(fieldstat->instance, "proxy_rule_hits", column_field, column_type, (size_t)COLUMN_MAX, fieldstat->column_array); - if(fieldstat->table_id < 0) - { - TFE_LOG_ERROR(local_logger, "fieldstat3 register dynamic table failed."); - FREE(&fieldstat); - return NULL; - } - - fieldstat->tags = ALLOC(struct fieldstat_tag*, max_thread); - for (i = 0; i < max_thread; i++) + fieldstat->tags = ALLOC(struct fieldstat_tag*, max_thread); + for (int i = 0; i < max_thread; i++) { fieldstat->tags[i] = ALLOC(struct fieldstat_tag, TAG_MAX-1); memcpy(fieldstat->tags[i], metric_tags, sizeof(struct fieldstat_tag) * (size_t)(TAG_MAX-1)); } - fieldstat_dynamic_instance_start(fieldstat->instance); - return fieldstat; + return fieldstat; } -void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat) +void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat) { - if(fieldstat) + if(fieldstat) { - if(fieldstat->instance) + if(fieldstat->fseasy) { - fieldstat_dynamic_instance_free(fieldstat->instance); + fieldstat_easy_free(fieldstat->fseasy); } for (int i = 0; i < fieldstat->max_thread; i++) @@ -223,4 +220,3 @@ void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat) FREE(&fieldstat); } } - diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp index 8379a65..e811436 100644 --- a/common/src/tfe_resource.cpp +++ b/common/src/tfe_resource.cpp @@ -11,10 +11,9 @@ #define MAAT_INPUT_FILE 2 static int scan_table_id[__SCAN_COMMON_TABLE_MAX]; -static struct tfe_fieldstat_metric_t *dynamic_fieldstat = NULL; +static struct tfe_fieldstat_easy_t *fieldstat_easy = NULL; static char *device_tag=NULL; -struct tfe_fieldstat_metric_t *fieldstat_handle = NULL; struct kafka *kafka_handle = NULL; struct maat *maat_handle = NULL; @@ -64,36 +63,33 @@ struct maat *tfe_get_maat_handle() return maat_handle; } -struct tfe_fieldstat_metric_t *tfe_get_fieldstat_handle() +struct tfe_fieldstat_easy_t *tfe_get_fieldstat_handle() { - return fieldstat_handle; + return fieldstat_easy; } -static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger) +static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger) { int cycle=0; - unsigned short telegraf_port=0; - char telegraf_ip[TFE_STRING_MAX]={0}; char app_name[TFE_STRING_MAX]={0}; - struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL; + char outpath[TFE_STRING_MAX]={0}; + struct tfe_fieldstat_easy_t *fieldstat_easy=NULL; - MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port)); - MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip)); - MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric"); - MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000); + MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "proxy_rule_hits"); + MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5); + MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_fieldstat.json"); - dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger); - if (dynamic_fieldstat == NULL) + fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger); + if (fieldstat_easy == NULL) { TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric."); return NULL; } - TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip); - TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port); TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); + TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath); - return dynamic_fieldstat; + return fieldstat_easy; } static struct maat *create_maat_feather(const char *instance_name, const char *profile, const char *section, int max_thread, void *logger) @@ -227,7 +223,6 @@ error_out: static char* create_device_tag(const char *profile, const char *section, void *logger) { - char *c=NULL; char accept_path[TFE_PATH_MAX] = {0}, accept_tags[TFE_STRING_MAX] = {0}; MESA_load_profile_string_def(profile, section, "accept_path", accept_path, sizeof(accept_path), ""); @@ -382,8 +377,8 @@ int tfe_env_init() return -1; } - dynamic_fieldstat = create_fieldstat_instance(profile_path, "proxy_hits", thread_num, g_default_logger); - if (!dynamic_fieldstat) + fieldstat_easy = create_fieldstat4_instance(profile_path, "proxy_hits", thread_num, g_default_logger); + if(!fieldstat_easy) { return -1; } diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index 4f13b38..d4b17a9 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -217,11 +217,8 @@ maat_redis_db_index=4 full_cfg_dir=pangu_policy/full/index/ inc_cfg_dir=pangu_policy/inc/index/ - [proxy_hits] -cycle=1000 -telegraf_port=8400 -telegraf_ip=127.0.0.1 +cycle=5 app_name="proxy_rule_hits" # for enable kni v4 diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index b2fe0a6..fa3d079 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -25,7 +25,7 @@ target_link_libraries(tfe pthread dl nfnetlink hiredis-static MESA_htable wiredcfg MESA_field_stat - fieldstat3 + fieldstat4 breakpad_mini ${SYSTEMD_LIBRARIES}) diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index ab73596..b4e25e1 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -39,31 +39,28 @@ static int tap_read(int tap_fd, char *buff, int buff_size, void *logger) return ret; } -static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger) +static struct tfe_fieldstat_easy_t *create_fieldstat4_instance(const char *profile, const char *section, int max_thread, void *logger) { int cycle=0; - unsigned short telegraf_port=0; - char telegraf_ip[TFE_STRING_MAX]={0}; char app_name[TFE_STRING_MAX]={0}; - struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL; + char outpath[TFE_STRING_MAX]={0}; + struct tfe_fieldstat_easy_t *fieldstat_easy=NULL; - MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port)); - MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip)); MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric"); - MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000); + MESA_load_profile_int_def(profile, section, "cycle", &cycle, 5); + MESA_load_profile_string_def(profile, section, "outpath", outpath, sizeof(outpath), "metrics/porxy_intercept_fieldstat.json"); - dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger); - if (dynamic_fieldstat == NULL) + fieldstat_easy = tfe_fieldstat_easy_create(app_name, outpath, cycle, max_thread, logger); + if (fieldstat_easy == NULL) { TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric."); return NULL; } - TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip); - TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port); TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name); TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle); + TFE_LOG_INFO(logger, "tfe fieldstat outpath : %s", outpath); - return dynamic_fieldstat; + return fieldstat_easy; } void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) @@ -72,7 +69,7 @@ void acceptor_kni_v4_destroy(struct acceptor_kni_v4 *ctx) { packet_io_destory(ctx->io); packet_io_fs_destory(ctx->packet_io_fs); - tfe_fieldstat_metric_destroy(ctx->metric); + tfe_fieldstat_easy_destroy(ctx->metric); free(ctx); ctx = NULL; } @@ -116,7 +113,7 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile, void *logger) goto error_out; } - ctx->metric = create_fieldstat_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger); + ctx->metric = create_fieldstat4_instance(profile, "proxy_hits", ctx->nr_worker_threads, logger); if(ctx->metric == NULL) { goto error_out; @@ -171,7 +168,7 @@ static void *worker_thread_cycle(void *arg) io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx); } - TFE_LOG_INFO(logger, "worker thread %d is running", thread_index); + TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index); while (1) { @@ -216,7 +213,7 @@ static void *worker_thread_cycle(void *arg) } error_out: - TFE_LOG_ERROR(logger, "worker thread %d exiting", thread_index); + TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", "LOG_TAG_KNI", thread_index); return (void *)NULL; } diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index fbbf6d3..84e9d51 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -2061,9 +2061,7 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct void ssl_stream_free(struct ssl_stream * s_stream, struct event_base * evbase, struct bufferevent * bev) { UNUSED struct ssl_shutdown_ctx * sslshutctx = NULL; - evutil_socket_t fd=-1; - fd=bufferevent_getfd(bev); - assert(fd==s_stream->_do_not_use.fd); + assert(bufferevent_getfd(bev)==s_stream->_do_not_use.fd); unsigned long sslerr=0; if (s_stream->dir == CONN_DIR_UPSTREAM) @@ -2217,6 +2215,7 @@ uint64_t ssl_stream_get_policy_id(struct ssl_stream *upstream) struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream); int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&policy_id, sizeof(policy_id), &out_size); assert(ret == 0); + (void)ret; return policy_id; } @@ -2228,6 +2227,7 @@ int ssl_stream_get_decrypted_profile_id(struct ssl_stream *upstream) struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream); int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DECRYPTION_PROFILE_ID, (unsigned char *)&profile_id, sizeof(profile_id), &out_size); assert(ret == 0); + (void)ret; return profile_id; } @@ -2239,6 +2239,7 @@ int ssl_stream_get_trusted_keyring_profile_id(struct ssl_stream *upstream) struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream); int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_KEYRING_FOR_TRUSTED_ID, (unsigned char *)&keyring_id, sizeof(keyring_id), &out_size); assert(ret == 0); + (void)ret; return keyring_id; } @@ -2250,6 +2251,7 @@ int ssl_stream_get_untrusted_keyring_profile_id(struct ssl_stream *upstream) struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(upstream->tcp_stream); int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_KEYRING_FOR_UNTRUSTED, (unsigned char *)&keyring_id, sizeof(keyring_id), &out_size); assert(ret == 0); + (void)ret; return keyring_id; } diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index 15032b4..ca99c44 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -824,11 +824,11 @@ int doh_on_data(const struct tfe_stream *stream, const struct tfe_http_session * void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, unsigned int thread_id) { size_t c2s_byte_num = 0, s2c_byte_num =0; - struct tfe_fieldstat_metric_t *fieldstat = tfe_get_fieldstat_handle(); + struct tfe_fieldstat_easy_t *fieldstat = tfe_get_fieldstat_handle(); - fieldstat->tags[thread_id][TAG_VSYS_ID].value_int = ctx->result->vsys_id; - fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->result->config_id; - fieldstat->tags[thread_id][TAG_ACTION].value_int = 48; + fieldstat->tags[thread_id][TAG_VSYS_ID].value_longlong = ctx->result->vsys_id; + fieldstat->tags[thread_id][TAG_RULE_ID].value_longlong = ctx->result->config_id; + fieldstat->tags[thread_id][TAG_ACTION].value_longlong = 48; fieldstat->tags[thread_id][TAG_SUB_ACTION].value_str = "redirect"; tfe_stream_info_get(stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); @@ -860,9 +860,23 @@ void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, out_bytes = c2s_byte_num; } - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + + char **payload = NULL; + size_t payload_len = 0; + + fieldstat_easy_output_array(fieldstat->fseasy, &payload, &payload_len); + if (payload) + { + for (size_t i = 0; i < payload_len; i++) + { + kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, payload[i], strlen(payload[i])); + FREE(&payload[i]); + } + FREE(&payload); + } return; } diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index e30d25a..47dc44b 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -1404,13 +1404,13 @@ void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_c proxy_action_map[PX_ACTION_REJECT]="deny"; proxy_action_map[PX_ACTION_WHITELIST]="allow"; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; - struct tfe_fieldstat_metric_t *fieldstat = tfe_get_fieldstat_handle(); + struct tfe_fieldstat_easy_t *fieldstat = tfe_get_fieldstat_handle(); for(i=0; i< ctx->n_enforce; i++) { - fieldstat->tags[thread_id][TAG_VSYS_ID].value_int = ctx->enforce_rules[i].vsys_id; - fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->enforce_rules[i].config_id; - fieldstat->tags[thread_id][TAG_ACTION].value_int = PX_ACTION_MANIPULATE; + fieldstat->tags[thread_id][TAG_VSYS_ID].value_longlong = ctx->enforce_rules[i].vsys_id; + fieldstat->tags[thread_id][TAG_RULE_ID].value_longlong = ctx->enforce_rules[i].config_id; + fieldstat->tags[thread_id][TAG_ACTION].value_longlong = PX_ACTION_MANIPULATE; if(ctx->enforce_rules[i].action == PX_ACTION_MANIPULATE) { fieldstat->tags[thread_id][TAG_SUB_ACTION].value_str = manipulate_action_map[ctx->param->action]; @@ -1464,9 +1464,23 @@ void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_c out_bytes=0; } - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_HIT_COUNT], hit_cnt, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); - tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_HIT_COUNT], hit_cnt, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_IN_BYTES], in_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + tfe_fieldstat_easy_incrby(fieldstat, fieldstat->counter_array[COLUMN_OUT_BYTES], out_bytes, fieldstat->tags[thread_id], TAG_MAX - 1, thread_id); + } + + char **payload = NULL; + size_t payload_len = 0; + + fieldstat_easy_output_array(fieldstat->fseasy, &payload, &payload_len); + if (payload) + { + for (size_t i = 0; i < payload_len; i++) + { + kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, payload[i], strlen(payload[i])); + FREE(&payload[i]); + } + FREE(&payload); } return; } diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 14922fc..c6eb3ff 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -153,9 +153,9 @@ add_library(MESA_field_stat SHARED IMPORTED GLOBAL) set_property(TARGET MESA_field_stat PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_field_stat2.so) set_property(TARGET MESA_field_stat PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) -add_library(fieldstat3 SHARED IMPORTED GLOBAL) -set_property(TARGET fieldstat3 PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libfieldstat3.so) -set_property(TARGET fieldstat3 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) +add_library(fieldstat4 SHARED IMPORTED GLOBAL) +set_property(TARGET fieldstat4 PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libfieldstat4.so) +set_property(TARGET fieldstat4 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) add_library(rdkafka SHARED IMPORTED GLOBAL) set_property(TARGET rdkafka PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/librdkafka.so)