From a59c24c850eb7f88c1c5c40ab1a2de1e64138f8a Mon Sep 17 00:00:00 2001 From: wangkuan Date: Tue, 9 Apr 2024 10:49:25 +0800 Subject: [PATCH] =?UTF-8?q?24.02.1=20=E5=90=8C=E6=AD=A5grootstream1.2.2?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=85=A8=E9=87=8F=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Groot-Stream 最新全量配置模版/TSG/README.md | 7 + .../TSG/etl_session_record_kafka_to_kafka | 162 ++++++++++++++++++ .../TSG/grootstream.yaml | 18 ++ .../TSG/session_record_processed_kafka_to_clickhouse | 48 ++++++ Groot-Stream 最新全量配置模版/TSG/udf.plugins | 15 ++ 5 files changed, 250 insertions(+) create mode 100644 Groot-Stream 最新全量配置模版/TSG/README.md create mode 100644 Groot-Stream 最新全量配置模版/TSG/etl_session_record_kafka_to_kafka create mode 100644 Groot-Stream 最新全量配置模版/TSG/grootstream.yaml create mode 100644 Groot-Stream 最新全量配置模版/TSG/session_record_processed_kafka_to_clickhouse create mode 100644 Groot-Stream 最新全量配置模版/TSG/udf.plugins diff --git a/Groot-Stream 最新全量配置模版/TSG/README.md b/Groot-Stream 最新全量配置模版/TSG/README.md new file mode 100644 index 0000000..eb19dc5 --- /dev/null +++ b/Groot-Stream 最新全量配置模版/TSG/README.md @@ -0,0 +1,7 @@ +v1.2.2 (2024-04-08) +https://git.mesalab.cn/galaxy/platform/groot-stream/-/releases/v1.2.2 + +Core +配置文件敏感信息加密,AES Config Shade增加选项:kafka.ssl.keystore.password,kafka.ssl.truststore.password,kafka.ssl.key.password +Connector +ClickHouse Sink Connector兼容AggregateFunction类型 diff --git a/Groot-Stream 最新全量配置模版/TSG/etl_session_record_kafka_to_kafka b/Groot-Stream 最新全量配置模版/TSG/etl_session_record_kafka_to_kafka new file mode 100644 index 0000000..1af1ba2 --- /dev/null +++ b/Groot-Stream 最新全量配置模版/TSG/etl_session_record_kafka_to_kafka @@ -0,0 +1,162 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SSL + kafka.ssl.endpoint.identification.algorithm: "" + kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks + kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks + kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e + #kafka.security.protocol: SASL_PLAINTEXT + #kafka.sasl.mechanism: PLAIN + #kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_session_record_kafka_to_kafka-20231221 + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: # [array of object] Field List, if not set, all fields(Map) will be output. + output_fields: # [array of object] Field List, if set, fields will be remove. + functions: # [array of object] Function List + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + kafka_sink: + type : kafka + properties: + topic: SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: {{ national_center_kafka_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + +application: + + env: # [object] Environment Variables + name: etl_session_record_kafka_to_kafka # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/Groot-Stream 最新全量配置模版/TSG/grootstream.yaml b/Groot-Stream 最新全量配置模版/TSG/grootstream.yaml new file mode 100644 index 0000000..a94e7e3 --- /dev/null +++ b/Groot-Stream 最新全量配置模版/TSG/grootstream.yaml @@ -0,0 +1,18 @@ +grootstream: + knowledge_base: +# - name: tsg_ip_asn +# fs_type: http +# fs_path: http://192.168.44.12:9999/v1/knowledge_base +# files: +# - f9f6bc91-2142-4673-8249-e097c00fe1ea + + - name: tsg_ip_asn + fs_type: local + fs_path: /data/hdd/olap/flink/topology/data/ + files: + - asn_builtin.mmdb + properties: + hos.path: http://192.168.44.12:9098/hos + hos.bucket.name.traffic_file: traffic_file_bucket + hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket + scheduler.knowledge_base.update.interval.minutes: 5 diff --git a/Groot-Stream 最新全量配置模版/TSG/session_record_processed_kafka_to_clickhouse b/Groot-Stream 最新全量配置模版/TSG/session_record_processed_kafka_to_clickhouse new file mode 100644 index 0000000..005b079 --- /dev/null +++ b/Groot-Stream 最新全量配置模版/TSG/session_record_processed_kafka_to_clickhouse @@ -0,0 +1,48 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + #kafka.security.protocol: SSL + #kafka.ssl.endpoint.identification.algorithm: "" + #kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks + #kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + #kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks + #kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e + #kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: session_record_processed_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.session_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: session_record_processed_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink + diff --git a/Groot-Stream 最新全量配置模版/TSG/udf.plugins b/Groot-Stream 最新全量配置模版/TSG/udf.plugins new file mode 100644 index 0000000..1de2395 --- /dev/null +++ b/Groot-Stream 最新全量配置模版/TSG/udf.plugins @@ -0,0 +1,15 @@ +com.geedgenetworks.core.udf.AsnLookup +com.geedgenetworks.core.udf.CurrentUnixTimestamp +com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.Domain +com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.FromUnixTimestamp +com.geedgenetworks.core.udf.GenerateStringArray +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.JsonExtract +com.geedgenetworks.core.udf.PathCombine +com.geedgenetworks.core.udf.Rename +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.StringJoiner +com.geedgenetworks.core.udf.UnixTimestampConverter \ No newline at end of file