From 7c3b77fb2f8de60a93a6ae2ef1c36d1e6646dbce Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Sat, 6 May 2023 19:04:06 +0800 Subject: [PATCH] =?UTF-8?q?TSG-14890=20TFE=E8=BE=93=E5=87=BAIntercept=20Po?= =?UTF-8?q?licy=20Hits=20Metrics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/tfe_cmsg.h | 6 +- common/include/tfe_fieldstat.h | 1 + common/src/intercept_policy.cpp | 12 +++ common/src/tfe_fieldstat.cpp | 149 ++++++++++++++++++++++++++++++++ platform/src/tcp_stream.cpp | 24 +++++ resource/pangu/pangu_http.json | 4 +- 6 files changed, 193 insertions(+), 3 deletions(-) diff --git a/common/include/tfe_cmsg.h b/common/include/tfe_cmsg.h index 7e3447a..9b32a44 100644 --- a/common/include/tfe_cmsg.h +++ b/common/include/tfe_cmsg.h @@ -101,8 +101,12 @@ enum tfe_cmsg_tlv_type TFE_CMSG_FQDN_CAT_ID_NUM, // unsigned int TFE_CMSG_FQDN_CAT_ID_VAL, // max size 8 * sizeof(unsigned int) - TFE_CMSG_COMMON_DIRECTION, + // according to KNI -> MESA_dir_link_to_human() + // 'E' or 'e': 表示发包方向是从Internal to External. + // 'I' or 'i': 表示发包方向是从External to Internal. + TFE_CMSG_COMMON_DIRECTION, // unsigned int TFE_CMSG_SSL_PASSTHROUGH_REASON, // string max size 32 + TFE_CMSG_POLICY_VSYS_ID, // unsigned int /* Add new cmsg here */ /* Add new cmsg here */ /* Add new cmsg here */ diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h index 7b0ee5e..de472f3 100644 --- a/common/include/tfe_fieldstat.h +++ b/common/include/tfe_fieldstat.h @@ -38,6 +38,7 @@ struct tfe_fieldstat_metric_t struct fieldstat_dynamic_instance *instance; }; +void tfe_set_intercept_metric(struct tfe_stream *stream, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes); int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int thread_id); struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger); void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat); diff --git a/common/src/intercept_policy.cpp b/common/src/intercept_policy.cpp index 1175c42..134525a 100644 --- a/common/src/intercept_policy.cpp +++ b/common/src/intercept_policy.cpp @@ -6,6 +6,7 @@ struct intercept_param { + int vsys_id; uint64_t rule_id; int ref_cnt; int action; @@ -25,6 +26,7 @@ struct intercept_policy_enforcer static void intercept_param_new_cb(const char *table_name, int table_id, const char *key, const char *table_line, void **ad, long argl, void *argp) { int action = 0; + int vsys_id = 0; size_t len = 0; size_t offset = 0; char buffer[8] = {0}; @@ -63,6 +65,14 @@ static void intercept_param_new_cb(const char *table_name, int table_id, const c goto error_out; } + item = cJSON_GetObjectItem(json, "vsys_id"); + if (!item || !cJSON_IsNumber(item)) + { + TFE_LOG_ERROR(enforcer->logger, "Invalid intercept parameter: %s invalid vsys_id format", key); + goto error_out; + } + vsys_id = item->valueint; + item = cJSON_GetObjectItem(json, "protocol"); if (unlikely(!item || !cJSON_IsString(item))) { @@ -75,6 +85,7 @@ static void intercept_param_new_cb(const char *table_name, int table_id, const c } param = ALLOC(struct intercept_param, 1); + param->vsys_id = vsys_id; param->rule_id = atoll(key); param->ref_cnt = 1; param->action = action; @@ -287,6 +298,7 @@ int intercept_policy_enforce(struct intercept_policy_enforcer *enforcer, struct tfe_cmsg_set(cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (const unsigned char *)&reason_hit_no_intercept, strlen(reason_hit_no_intercept)); } + tfe_cmsg_set(cmsg, TFE_CMSG_POLICY_VSYS_ID, (const unsigned char *)¶m->vsys_id, sizeof(param->vsys_id)); tfe_cmsg_set(cmsg, TFE_CMSG_TCP_PASSTHROUGH, (const unsigned char *)&tcp_passthrough, sizeof(tcp_passthrough)); tfe_cmsg_set(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (const unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept)); tfe_cmsg_set(cmsg, TFE_CMSG_TCP_OPTION_PROFILE_ID, (const unsigned char *)&(param->tcp_option_profile), sizeof(param->tcp_option_profile)); diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp index 69a27b3..b6e815c 100644 --- a/common/src/tfe_fieldstat.cpp +++ b/common/src/tfe_fieldstat.cpp @@ -1,6 +1,155 @@ #include #include +#include "tfe_stream.h" +#include "tfe_resource.h" + +void tfe_set_intercept_metric(struct tfe_stream *stream, int hit_count, int downstream_rx_pkts, int downstream_rx_bytes, int upstream_rx_pkts, int upstream_rx_bytes) +{ + int ret; + uint16_t out_size; + struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT); + + struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(stream); + if (cmsg == NULL) + { + return; + } + + int vsys_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); + return; + } + + uint64_t rule_id = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)&rule_id, sizeof(rule_id), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); + return; + } + + uint8_t hit_no_intercept = 0; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); + return; + } + + // according to KNI -> MESA_dir_link_to_human() + // 'E' or 'e': 表示发包方向是从Internal to External. + // 'I' or 'i': 表示发包方向是从External to Internal. + unsigned int route_dir; + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_COMMON_DIRECTION, (unsigned char *)&route_dir, sizeof(route_dir), &out_size); + if (ret != 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch route_dir from cmsg: %s", strerror(-ret)); + return; + } + + int dir_is_e2i = 0; + switch (route_dir) + { + case 'e': + /* fall through */ + case 'E': + dir_is_e2i = 0; + break; + case 'i': + /* fall through */ + case 'I': + dir_is_e2i = 1; + break; + default: + TFE_LOG_ERROR(g_default_logger, "failed at fetch route dir from cmsg: invalid route dir %c", route_dir); + return; + } + + int in_pkts = 0; + int in_bytes = 0; + int out_pkts = 0; + int out_bytes = 0; + + // incoming : E2I 的流量 + // outcoming : I2E 的流量 + // first_ctr_packet_dir <==> client hello packet dir + if (dir_is_e2i == 1) + { + in_pkts = downstream_rx_pkts; + in_bytes = downstream_rx_bytes; + + out_pkts = upstream_rx_pkts; + out_bytes = upstream_rx_bytes; + } + else + { + in_pkts = upstream_rx_pkts; + in_bytes = upstream_rx_bytes; + + out_pkts = downstream_rx_pkts; + out_bytes = downstream_rx_bytes; + } + + int nr_tags = 0; + struct fieldstat_tag temp_tags[TAG_MAX] = {0}; + + temp_tags[nr_tags].key = "vsys_id"; + temp_tags[nr_tags].value_type = 0; + temp_tags[nr_tags].value_int = vsys_id; + nr_tags++; + + temp_tags[nr_tags].key = "rule_id"; + temp_tags[nr_tags].value_type = 0; + temp_tags[nr_tags].value_int = rule_id; + nr_tags++; + + uint8_t pinning_status = 0; + if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) + { + temp_tags[nr_tags].key = "pinning_status"; + temp_tags[nr_tags].value_type = 0; + temp_tags[nr_tags].value_int = pinning_status; + nr_tags++; + } + + // action : 2 Intercept; 3 No Intercept + temp_tags[nr_tags].key = "action"; + temp_tags[nr_tags].value_type = 0; + temp_tags[nr_tags].value_int = (hit_no_intercept == 1 ? 3 : 2); + nr_tags++; + + // sub_action not need for intercept metrics + + if (hit_count > 0) + { + fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_HIT_COUNT], "proxy_rule_hits", hit_count, temp_tags, (size_t)nr_tags, stream->thread_id); + } + + if (in_pkts > 0) + { + fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_IN_PKTS], "proxy_rule_hits", in_pkts, temp_tags, (size_t)nr_tags, stream->thread_id); + } + + if (in_bytes > 0) + { + fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_IN_BYTES], "proxy_rule_hits", in_bytes, temp_tags, (size_t)nr_tags, stream->thread_id); + } + + if (out_pkts > 0) + { + fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_OUT_PKTS], "proxy_rule_hits", out_pkts, temp_tags, (size_t)nr_tags, stream->thread_id); + } + + if (out_bytes > 0) + { + fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, fieldstat->column_array[COLUMN_OUT_BYTES], "proxy_rule_hits", out_bytes, temp_tags, (size_t)nr_tags, stream->thread_id); + } +} + int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int thread_id) { return fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, column_id, "proxy_rule_hits", value, tags, (size_t)TAG_MAX, thread_id); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 46c0b33..559aae3 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -491,6 +492,17 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg { TFE_PROXY_STAT_INCREASE(STAT_STREAM_BYPASS, 1); _stream->is_first_call_rxcb = 1; + tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0); + } + + int inbuff_len = evbuffer_get_length(__input_buffer); + if (bev == _stream->conn_downstream->bev) + { + tfe_set_intercept_metric(&_stream->head, 0, 1, inbuff_len, 0, 0); + } + else + { + tfe_set_intercept_metric(&_stream->head, 0, 0, 0, 1, inbuff_len); } struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); @@ -649,6 +661,13 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) assert(0); } + if (_stream->is_first_call_rxcb == 0) + { + TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1); + _stream->is_first_call_rxcb = 1; + tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0); + } + /* * Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb @@ -675,6 +694,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, inbuff_len); // TODO: Delete the following code when support calling the tfe-plugin TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, inbuff_len); + tfe_set_intercept_metric(&_stream->head, 0, 1, inbuff_len, 0, 0); _stream->downstream_rx_offset += inbuff_len; } else @@ -682,6 +702,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, inbuff_len); // TODO: Delete the following code when support calling the tfe-plugin TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, inbuff_len); + tfe_set_intercept_metric(&_stream->head, 0, 0, 0, 1, inbuff_len); _stream->upstream_rx_offset += inbuff_len; } @@ -707,6 +728,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1); _stream->is_first_call_rxcb = 1; + tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0); } outbuf = bufferevent_get_output(peer_conn->bev); @@ -794,11 +816,13 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) if (dir == CONN_DIR_DOWNSTREAM) { TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, rx_offset_increase); + tfe_set_intercept_metric(&_stream->head, 0, 1, rx_offset_increase, 0, 0); _stream->downstream_rx_offset += rx_offset_increase; } else { TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, rx_offset_increase); + tfe_set_intercept_metric(&_stream->head, 0, 0, 0, 1, rx_offset_increase); _stream->upstream_rx_offset += rx_offset_increase; } diff --git a/resource/pangu/pangu_http.json b/resource/pangu/pangu_http.json index c2eee89..945e17c 100644 --- a/resource/pangu/pangu_http.json +++ b/resource/pangu/pangu_http.json @@ -261,8 +261,8 @@ { "table_name": "PXY_INTERCEPT_COMPILE", "table_content": [ - "0\t0\t2\t1\t1\t{}\t{\"protocol\":\"SSL\",\"keyring_for_trusted\":765,\"keyring_for_untrusted\":10,\"decryption\":0,\"tcp_option_profile\":1,\"traffic_mirror\":{\"enable\":0}}\t1\t2", - "4\t0\t2\t1\t1\t{}\t{\"protocol\":\"SSL\",\"keyring_for_trusted\":1,\"keyring_for_untrusted\":10,\"decryption\":0,\"tcp_option_profile\":1,\"traffic_mirror\":{\"enable\":1,\"mirror_profile\":1234}}\t1\t2" + "0\t0\t2\t1\t1\t{}\t{\"vsys_id\":1,\"protocol\":\"SSL\",\"keyring_for_trusted\":765,\"keyring_for_untrusted\":10,\"decryption\":0,\"tcp_option_profile\":1,\"traffic_mirror\":{\"enable\":0}}\t1\t2", + "4\t0\t2\t1\t1\t{}\t{\"vsys_id\":1,\"protocol\":\"SSL\",\"keyring_for_trusted\":1,\"keyring_for_untrusted\":10,\"decryption\":0,\"tcp_option_profile\":1,\"traffic_mirror\":{\"enable\":1,\"mirror_profile\":1234}}\t1\t2" ] }, {