From 566d5d51fb8829cd025aabf946d1ad1e5673c2ab Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 2 Aug 2024 16:42:33 +0800 Subject: [PATCH] =?UTF-8?q?traffic=5Fsketch=5Fmetric=E9=80=82=E9=85=8DMetr?= =?UTF-8?q?ic=E7=BB=93=E6=9E=84=E4=BF=AE=E6=94=B9=E4=B8=BA=E6=89=81?= =?UTF-8?q?=E5=B9=B3=E7=BB=93=E6=9E=84=20TSG-21936?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ..._traffic_sketch_metric_kafka_to_clickhouse | 143 ++++++++++++++++ .../templates/traffic_sketch_metric.yaml.j2 | 152 ++++++++++++++++++ .../TSG-24.08/groot_stream/udf.plugins | 24 +++ 3 files changed, 319 insertions(+) create mode 100644 tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse create mode 100644 tsg_olap/upgrade/TSG-24.08/groot_stream/templates/traffic_sketch_metric.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.08/groot_stream/udf.plugins diff --git a/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse new file mode 100644 index 0000000..55f2aed --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse @@ -0,0 +1,143 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,device_id,device_group,data_center,ip_protocol,direction,client_ip,server_ip,server_domain,app,recv_time] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 300 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ sent_bytes ] + - function: NUMBER_SUM + lookup_fields: [ received_bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + - function: NUMBER_SUM + lookup_fields: [ sent_pkts ] + - function: NUMBER_SUM + lookup_fields: [ received_pkts ] + - function: NUMBER_SUM + lookup_fields: [ asymmetric_c2s_flows ] + - function: NUMBER_SUM + lookup_fields: [ asymmetric_s2c_flows ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: FIRST_VALUE + lookup_fields: [ client_asn ] + - function: FIRST_VALUE + lookup_fields: [ server_asn ] + - function: FIRST_VALUE + lookup_fields: [ server_fqdn ] + - function: FIRST_VALUE + lookup_fields: [ app_category ] + - function: FIRST_VALUE + lookup_fields: [ c2s_ttl ] + - function: FIRST_VALUE + lookup_fields: [ s2c_ttl ] + - function: FIRST_VALUE + lookup_fields: [ c2s_link_id ] + - function: FIRST_VALUE + lookup_fields: [ s2c_link_id ] + + + + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + topology: + - name: kafka_source + downstream: [pre_etl_processor] + - name: pre_etl_processor + downstream: [aggregate_processor] + - name: aggregate_processor + downstream: [post_etl_processor] + - name: post_etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.08/groot_stream/templates/traffic_sketch_metric.yaml.j2 b/tsg_olap/upgrade/TSG-24.08/groot_stream/templates/traffic_sketch_metric.yaml.j2 new file mode 100644 index 0000000..51d8ee3 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.08/groot_stream/templates/traffic_sketch_metric.yaml.j2 @@ -0,0 +1,152 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + pre_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 300 + + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,device_id,device_group,data_center,ip_protocol,direction,client_ip,server_ip,server_domain,app,recv_time] + window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 300 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ sent_bytes ] + - function: NUMBER_SUM + lookup_fields: [ received_bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + - function: NUMBER_SUM + lookup_fields: [ sent_pkts ] + - function: NUMBER_SUM + lookup_fields: [ received_pkts ] + - function: NUMBER_SUM + lookup_fields: [ asymmetric_c2s_flows ] + - function: NUMBER_SUM + lookup_fields: [ asymmetric_s2c_flows ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: FIRST_VALUE + lookup_fields: [ client_asn ] + - function: FIRST_VALUE + lookup_fields: [ server_asn ] + - function: FIRST_VALUE + lookup_fields: [ server_fqdn ] + - function: FIRST_VALUE + lookup_fields: [ app_category ] + - function: FIRST_VALUE + lookup_fields: [ c2s_ttl ] + - function: FIRST_VALUE + lookup_fields: [ s2c_ttl ] + - function: FIRST_VALUE + lookup_fields: [ c2s_link_id ] + - function: FIRST_VALUE + lookup_fields: [ s2c_link_id ] + + + + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.08/groot_stream/udf.plugins b/tsg_olap/upgrade/TSG-24.08/groot_stream/udf.plugins new file mode 100644 index 0000000..2978bbe --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.08/groot_stream/udf.plugins @@ -0,0 +1,24 @@ +com.geedgenetworks.core.udf.AsnLookup +com.geedgenetworks.core.udf.CurrentUnixTimestamp +com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.Domain +com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.EncodeBase64 +com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.Flatten +com.geedgenetworks.core.udf.FromUnixTimestamp +com.geedgenetworks.core.udf.GenerateStringArray +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.JsonExtract +com.geedgenetworks.core.udf.PathCombine +com.geedgenetworks.core.udf.Rename +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.StringJoiner +com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.udaf.NumberSum +com.geedgenetworks.core.udf.udaf.CollectList +com.geedgenetworks.core.udf.udaf.CollectSet +com.geedgenetworks.core.udf.udaf.LongCount +com.geedgenetworks.core.udf.udaf.Mean +com.geedgenetworks.core.udf.udaf.LastValue +com.geedgenetworks.core.udf.udaf.FirstValue \ No newline at end of file