From 393cc1381140f9dc2a4c6ff23f0e99880d66ca7f Mon Sep 17 00:00:00 2001 From: wangkuan Date: Fri, 22 Nov 2024 11:44:01 +0800 Subject: [PATCH] =?UTF-8?q?1.TSG-23685=20=20=E4=BF=AE=E6=94=B9=E6=AF=8F?= =?UTF-8?q?=E4=B8=AA=E6=95=B0=E6=8D=AE=E6=BA=90ETL=E6=A8=A1=E7=89=88?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E5=86=8D=E7=94=B1OLAP=20=E7=94=9F=E6=88=90?= =?UTF-8?q?=20Log=20ID=20=EF=BC=88=E9=99=A4dos=E6=9A=82=E4=B8=8D=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=EF=BC=89=E3=80=822.TSG-23341=20=20OLAP=20=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E9=A2=84=E5=A4=84=E7=90=86=E6=94=AF=E6=8C=81Encryptio?= =?UTF-8?q?n=20in=20Transit=E3=80=823.=E7=BB=9F=E4=B8=80Aviator=E8=A1=A8?= =?UTF-8?q?=E8=BE=BE=E5=BC=8F=E8=AF=AD=E6=B3=95=E5=92=8C=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=96=B9=E5=BC=8F,=E6=89=80=E6=9C=89processor=E5=92=8C?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E4=BD=BF=E7=94=A8=E8=A1=A8=E8=BE=BE=E5=BC=8F?= =?UTF-8?q?=E4=B8=8D=E9=9C=80=E8=A6=81=E4=BD=BF=E7=94=A8event.=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../agg_app_protocol_traffic.yaml.j2 | 150 +++++++++++++++++ ...on_rule_metric_kafka_to_clickhouse.yaml.j2 | 144 ++++++++++++++++ .../templates-24.11/data_transporter.yaml.j2 | 55 +++++++ .../datapath_telemetry_record.yaml.j2 | 75 +++++++++ .../templates-24.11/proxy_event.yaml.j2 | 154 ++++++++++++++++++ .../templates-24.11/session_record.yaml.j2 | 153 +++++++++++++++++ .../traffic_sketch_metric.yaml.j2 | 86 ++++++++++ .../transaction_record.yaml.j2 | 126 ++++++++++++++ .../templates-24.11/voip_record.yaml.j2 | 126 ++++++++++++++ .../agg_app_protocol_traffic.yaml.j2 | 150 +++++++++++++++++ .../datapath_telemetry_record.yaml.j2 | 75 +++++++++ .../templates/proxy_event.yaml.j2 | 154 ++++++++++++++++++ .../templates/session_record.yaml.j2 | 153 +++++++++++++++++ .../templates/traffic_sketch_metric.yaml.j2 | 86 ++++++++++ .../templates/transaction_record.yaml.j2 | 126 ++++++++++++++ .../templates/voip_record.yaml.j2 | 126 ++++++++++++++ .../TSG-24.11/groot_stream/udf.plugins | 39 +++++ 17 files changed, 1978 insertions(+) create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_app_protocol_traffic.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_dos_protection_rule_metric_kafka_to_clickhouse.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/data_transporter.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/datapath_telemetry_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/proxy_event.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/session_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/traffic_sketch_metric.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/transaction_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/voip_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/datapath_telemetry_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/proxy_event.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/session_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/traffic_sketch_metric.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/transaction_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/templates/voip_record.yaml.j2 create mode 100644 tsg_olap/upgrade/TSG-24.11/groot_stream/udf.plugins diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_app_protocol_traffic.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_app_protocol_traffic.yaml.j2 new file mode 100644 index 0000000..01b0227 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_app_protocol_traffic.yaml.j2 @@ -0,0 +1,150 @@ +sources: + kafka_source: + type: kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 10000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1 + kafka.auto.offset.reset: latest + format: json + +filters: + name_filter: + type: aviator + properties: + expression: name == 'traffic_application_protocol_stat' + +processing_pipelines: + pre_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ timestamp_ms ] + parameters: + interval: 1000 + precision: milliseconds + - function: EVAL + output_fields: [name] + parameters: + value_expression: "'application_protocol_stat'" + + explode_app_and_protocol_processor: + type: table + functions: + - function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." + + window_agg_processor: + type: aggregate + group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id] + window_type: tumbling_processing_time + window_size: 1 + window_slide: 1 + mini_batch: false # false, true + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + output_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ c2s_pkts ] + output_fields: [ c2s_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_pkts ] + output_fields: [ s2c_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_bytes ] + output_fields: [ c2s_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_bytes ] + output_fields: [ s2c_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + output_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + output_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + output_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + output_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_ooorder_pkts ] + output_fields: [ c2s_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_ooorder_pkts ] + output_fields: [ s2c_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + output_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + output_fields: [ s2c_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_bytes ] + output_fields: [ c2s_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_bytes ] + output_fields: [ s2c_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_bytes ] + output_fields: [ in_bytes ] + - function: NUMBER_SUM + lookup_fields: [ out_bytes ] + output_fields: [ out_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_pkts ] + output_fields: [ in_pkts ] + - function: NUMBER_SUM + lookup_fields: [ out_pkts ] + output_fields: [ out_pkts ] + +sinks: + kafka_sink: + type: kafka + properties: + topic: APPLICATION-PROTOCOL-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 100 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + log.failures.only: true + +application: + env: + name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [name_filter] + - name: name_filter + downstream: [pre_processor] + - name: pre_processor + downstream: [explode_app_and_protocol_processor] + - name: explode_app_and_protocol_processor + downstream: [window_agg_processor] + - name: window_agg_processor + downstream: [kafka_sink] + - name: kafka_sink + downstream: [] \ No newline at end of file diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_dos_protection_rule_metric_kafka_to_clickhouse.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_dos_protection_rule_metric_kafka_to_clickhouse.yaml.j2 new file mode 100644 index 0000000..992be3e --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/agg_dos_protection_rule_metric_kafka_to_clickhouse.yaml.j2 @@ -0,0 +1,144 @@ +sources: + kafka_source: + type: kafka + watermark_timestamp: timestamp_ms + watermark_timestamp_unit: ms + watermark_lag: 60000 + properties: + topic: DOS-PROTECTION-RULE-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.client.id: DOS-PROTECTION-RULE-METRIC + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.buffer.memory: + kafka.group.id: dos_event_kafka_to_clickhouse-20231221 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + +processing_pipelines: + + pre_etl_processor: # [object] Processing Pipeline + type: projection + functions: # [array of object] Function List + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ recv_time ] + parameters: + precision: seconds + aggregate_processor: + type: aggregate + group_by_fields: [vsys_id,rule_uuid,server_ip,client_ip] + window_type: tumbling_event_time # tumbling_event_time,sliding_processing_time,sliding_event_time + window_size: 600 + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ bytes ] + - function: NUMBER_SUM + lookup_fields: [ pkts ] + output_fields: [ packets ] + - function: FIRST_VALUE + lookup_fields: [ client_country ] + - function: FIRST_VALUE + lookup_fields: [ server_country ] + - function: MIN + lookup_fields: [ timestamp_ms ] + output_fields: [ start_timestamp_ms ] + - function: MIN + lookup_fields: [ recv_time ] + - function: MAX + lookup_fields: [ timestamp_ms ] + output_fields: [ end_timestamp_ms ] + - function: FIRST_VALUE + lookup_fields: [ duration ] + post_etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ end_timestamp_ms ] + output_fields: [ end_time ] + parameters: + precision: seconds + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ start_timestamp_ms ] + output_fields: [ start_time ] + parameters: + precision: seconds + - function: EVAL + output_fields: [ duration ] + parameters: + value_expression: "((end_time-start_time) > 0)? (end_time-start_time) : (duration/1000)" + - function: EVAL + output_fields: [ end_time ] + parameters: + value_expression: start_time + duration + - function: EVAL + output_fields: [ session_rate ] + parameters: + value_expression: math.round((double(sessions) / duration )*100)/100.0 + - function: EVAL + output_fields: [ packet_rate ] + parameters: + value_expression: math.round((double(packets) / duration ) *100)/100.0 + - function: EVAL + output_fields: [ bit_rate ] + parameters: + value_expression: math.round((double((bytes*8)) / duration) *100)/100.0 + - function: RENAME + parameters: + rename_fields: + client_ip: source_ip + client_country: source_country + server_ip: destination_ip + server_country: destination_country + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.dos_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + env: + name: dos_event_kafka_to_clickhouse + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + +# topology: +# - name: kafka_source +# downstream: [pre_etl_processor] +# - name: pre_etl_processor +# downstream: [aggregate_processor] +# - name: aggregate_processor +# downstream: [post_etl_processor] +# - name: post_etl_processor +# downstream: [clickhouse_sink] +# - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/data_transporter.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/data_transporter.yaml.j2 new file mode 100644 index 0000000..928a7b4 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/data_transporter.yaml.j2 @@ -0,0 +1,55 @@ +sources: + kafka_source: + type: kafka + properties: + topic: {{ kafka_source_topic }} + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: {{ kafka_source_topic }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: raw + +sinks: + kafka_sink: + type: kafka + properties: + topic: {{ kafka_sink_topic }} + kafka.bootstrap.servers: { { kafka_sink_servers } } + kafka.client.id: {{ kafka_sink_topic }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: raw + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/datapath_telemetry_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/datapath_telemetry_record.yaml.j2 new file mode 100644 index 0000000..20dfd5a --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/datapath_telemetry_record.yaml.j2 @@ -0,0 +1,75 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: DATAPATH-TELEMETRY-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: msgpack + +processing_pipelines: + etl_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: BASE64_ENCODE_TO_STRING + lookup_fields: [packet] + output_fields: [packet] + parameters: + input_type: byte_array + + + +sinks: + kafka_sink: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: DATAPATH-TELEMETRY-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: raw + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.datapath_telemetry_record_local + batch.size: 5000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/proxy_event.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/proxy_event.yaml.j2 new file mode 100644 index 0000000..a88c310 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/proxy_event.yaml.j2 @@ -0,0 +1,154 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: PROXY-EVENT + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: HMAC + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: ENCRYPT + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id ] + parameters: + identifier: aes-128-gcm + - function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm + +sinks: + kafka_sink: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: PROXY-EVENT + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.proxy_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + kms.type: vault + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/proxy_event/schema?option=encrypt_fields + {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/session_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/session_record.yaml.j2 new file mode 100644 index 0000000..d5f04c7 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/session_record.yaml.j2 @@ -0,0 +1,153 @@ +sources: + kafka_source: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: SESSION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: HMAC + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: ENCRYPT + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id ] + parameters: + identifier: aes-128-gcm + - function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm + +sinks: + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: SESSION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.session_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + kms.type: vault + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/traffic_sketch_metric.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/traffic_sketch_metric.yaml.j2 new file mode 100644 index 0000000..71f2b3b --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/traffic_sketch_metric.yaml.j2 @@ -0,0 +1,86 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 60 + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: "(direction == 'Outbound')? client_ip : server_ip" + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: "(direction == 'Outbound')? server_ip : client_ip" + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/transaction_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/transaction_record.yaml.j2 new file mode 100644 index 0000000..09e6d60 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/transaction_record.yaml.j2 @@ -0,0 +1,126 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: TRANSACTION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: projection + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: TRANSACTION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.transaction_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/voip_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/voip_record.yaml.j2 new file mode 100644 index 0000000..c3869d1 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates-24.11/voip_record.yaml.j2 @@ -0,0 +1,126 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: VOIP-CONVERSATION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: projection + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: VOIP-CONVERSATION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 new file mode 100644 index 0000000..01b0227 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 @@ -0,0 +1,150 @@ +sources: + kafka_source: + type: kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 10000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1 + kafka.auto.offset.reset: latest + format: json + +filters: + name_filter: + type: aviator + properties: + expression: name == 'traffic_application_protocol_stat' + +processing_pipelines: + pre_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ timestamp_ms ] + parameters: + interval: 1000 + precision: milliseconds + - function: EVAL + output_fields: [name] + parameters: + value_expression: "'application_protocol_stat'" + + explode_app_and_protocol_processor: + type: table + functions: + - function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." + + window_agg_processor: + type: aggregate + group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id] + window_type: tumbling_processing_time + window_size: 1 + window_slide: 1 + mini_batch: false # false, true + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + output_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ c2s_pkts ] + output_fields: [ c2s_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_pkts ] + output_fields: [ s2c_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_bytes ] + output_fields: [ c2s_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_bytes ] + output_fields: [ s2c_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + output_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + output_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + output_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + output_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_ooorder_pkts ] + output_fields: [ c2s_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_ooorder_pkts ] + output_fields: [ s2c_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + output_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + output_fields: [ s2c_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_bytes ] + output_fields: [ c2s_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_bytes ] + output_fields: [ s2c_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_bytes ] + output_fields: [ in_bytes ] + - function: NUMBER_SUM + lookup_fields: [ out_bytes ] + output_fields: [ out_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_pkts ] + output_fields: [ in_pkts ] + - function: NUMBER_SUM + lookup_fields: [ out_pkts ] + output_fields: [ out_pkts ] + +sinks: + kafka_sink: + type: kafka + properties: + topic: APPLICATION-PROTOCOL-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 100 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + log.failures.only: true + +application: + env: + name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [name_filter] + - name: name_filter + downstream: [pre_processor] + - name: pre_processor + downstream: [explode_app_and_protocol_processor] + - name: explode_app_and_protocol_processor + downstream: [window_agg_processor] + - name: window_agg_processor + downstream: [kafka_sink] + - name: kafka_sink + downstream: [] \ No newline at end of file diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/datapath_telemetry_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/datapath_telemetry_record.yaml.j2 new file mode 100644 index 0000000..20dfd5a --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/datapath_telemetry_record.yaml.j2 @@ -0,0 +1,75 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: DATAPATH-TELEMETRY-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: msgpack + +processing_pipelines: + etl_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: BASE64_ENCODE_TO_STRING + lookup_fields: [packet] + output_fields: [packet] + parameters: + input_type: byte_array + + + +sinks: + kafka_sink: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: DATAPATH-TELEMETRY-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: raw + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.datapath_telemetry_record_local + batch.size: 5000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/proxy_event.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/proxy_event.yaml.j2 new file mode 100644 index 0000000..a88c310 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/proxy_event.yaml.j2 @@ -0,0 +1,154 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: PROXY-EVENT + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: HMAC + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: ENCRYPT + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id ] + parameters: + identifier: aes-128-gcm + - function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm + +sinks: + kafka_sink: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: PROXY-EVENT + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.proxy_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + kms.type: vault + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/proxy_event/schema?option=encrypt_fields + {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/session_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/session_record.yaml.j2 new file mode 100644 index 0000000..d5f04c7 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/session_record.yaml.j2 @@ -0,0 +1,153 @@ +sources: + kafka_source: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: SESSION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + + - function: HMAC + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: HMAC + lookup_fields: [ phone_number ] + output_fields: [ phone_number_hmac ] + parameters: + secret_key: Galaxy2019# + algorithm: sha256 + output_format: base64 + - function: ENCRYPT + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_id ] + parameters: + identifier: aes-128-gcm + - function: ENCRYPT + lookup_fields: [ phone_number ] + output_fields: [ phone_number ] + parameters: + identifier: aes-128-gcm + +sinks: + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: SESSION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.session_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + kms.type: vault + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + projection.encrypt.schema.registry.uri: 192.168.44.12:9999/v1/database/table/session_record/schema?option=encrypt_fields + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/traffic_sketch_metric.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/traffic_sketch_metric.yaml.j2 new file mode 100644 index 0000000..71f2b3b --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/traffic_sketch_metric.yaml.j2 @@ -0,0 +1,86 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + interval: 60 + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: "(direction == 'Outbound')? client_ip : server_ip" + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: "(direction == 'Outbound')? server_ip : client_ip" + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/transaction_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/transaction_record.yaml.j2 new file mode 100644 index 0000000..09e6d60 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/transaction_record.yaml.j2 @@ -0,0 +1,126 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: TRANSACTION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: projection + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: TRANSACTION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.transaction_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/voip_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/voip_record.yaml.j2 new file mode 100644 index 0000000..c3869d1 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/templates/voip_record.yaml.j2 @@ -0,0 +1,126 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.client.id: VOIP-CONVERSATION-RECORD + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: projection + functions: + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.rtp_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.http_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.eml_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.policy_capture_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.client.id: VOIP-CONVERSATION-RECORD + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + properties: + hos.bucket.name.rtp_file: traffic_rtp_file_bucket + hos.bucket.name.http_file: traffic_http_file_bucket + hos.bucket.name.eml_file: traffic_eml_file_bucket + hos.bucket.name.policy_capture_file: traffic_policy_capture_file_bucket + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.11/groot_stream/udf.plugins b/tsg_olap/upgrade/TSG-24.11/groot_stream/udf.plugins new file mode 100644 index 0000000..5195847 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.11/groot_stream/udf.plugins @@ -0,0 +1,39 @@ +com.geedgenetworks.core.udf.AsnLookup +com.geedgenetworks.core.udf.CurrentUnixTimestamp +com.geedgenetworks.core.udf.DecodeBase64 +com.geedgenetworks.core.udf.Domain +com.geedgenetworks.core.udf.Drop +com.geedgenetworks.core.udf.EncodeBase64 +com.geedgenetworks.core.udf.Encrypt +com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.Flatten +com.geedgenetworks.core.udf.FromUnixTimestamp +com.geedgenetworks.core.udf.GenerateStringArray +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.Hmac +com.geedgenetworks.core.udf.JsonExtract +com.geedgenetworks.core.udf.PathCombine +com.geedgenetworks.core.udf.Rename +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.StringJoiner +com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.udaf.NumberSum +com.geedgenetworks.core.udf.udaf.CollectList +com.geedgenetworks.core.udf.udaf.CollectSet +com.geedgenetworks.core.udf.udaf.LongCount +com.geedgenetworks.core.udf.udaf.Mean +com.geedgenetworks.core.udf.udaf.LastValue +com.geedgenetworks.core.udf.udaf.FirstValue +com.geedgenetworks.core.udf.udaf.hlld.Hlld +com.geedgenetworks.core.udf.udaf.hlld.HlldApproxCountDistinct +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogram +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantile +com.geedgenetworks.core.udf.udaf.HdrHistogram.HdrHistogramQuantiles +com.geedgenetworks.core.udf.udtf.JsonUnroll +com.geedgenetworks.core.udf.udtf.Unroll +com.geedgenetworks.core.udf.udtf.PathUnroll +com.geedgenetworks.core.udf.uuid.UUID +com.geedgenetworks.core.udf.uuid.UUIDv5 +com.geedgenetworks.core.udf.uuid.UUIDv7 +com.geedgenetworks.core.udf.udaf.Max +com.geedgenetworks.core.udf.udaf.Min \ No newline at end of file