diff --git a/cyber_narrator/installation/groot_stream/README.md b/cyber_narrator/installation/groot_stream/README.md index 395ba4f..f195e3f 100644 --- a/cyber_narrator/installation/groot_stream/README.md +++ b/cyber_narrator/installation/groot_stream/README.md @@ -1,2 +1,4 @@ -v1.2.4 (2024-04-08) -https://git.mesalab.cn/galaxy/platform/groot-stream/-/releases/v1.2.4 + +groot-stream version > 1.4.0 + +etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file diff --git a/cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka b/cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka similarity index 89% rename from cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka rename to cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka index 6e90df7..82b4855 100644 --- a/cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka +++ b/cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka @@ -3,7 +3,7 @@ sources: type: kafka # fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: # [object] Source Properties - topic: SESSION-RECORD-PROCESSED + topic: {{ tsg_olap_kafka_session_record_or_session_record_processed_topic }} # SESSION-RECORD/SESSION-RECORD-PROCESSED kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 @@ -11,7 +11,7 @@ sources: kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 - kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka + kafka.group.id: etl_session_record_kafka_to_cn_kafka kafka.auto.offset.reset: latest format: json @@ -21,6 +21,28 @@ processing_pipelines: remove_fields: output_fields: functions: # [array of object] Function List + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ cn_log_id ] + parameters: + data_center_id_num: 1 + + - function: EVAL + output_fields: [ log_id ] + parameters: + value_expression: "log_id == null ? cn_log_id : log_id" + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ kafka_recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ recv_time ] + parameters: + value_expression: "recv_time == null ? kafka_recv_time : recv_time" + - function: EVAL output_fields: [ domain ] parameters: @@ -261,19 +283,19 @@ processing_pipelines: parameters: value_expression: "client_zone == 'external' ? sessions : external_query_num" - - function: CN_VPN_LOOKUP + - function: CN_ANONYMITY_LOOKUP lookup_fields: [ server_ip ] - output_fields: [ server_vpn_service_name ] + output_fields: [ server_node_type ] parameters: - kb_name: cn_vpn_learning_ip - option: IP_TO_VPN + kb_name: cn_ioc_darkweb + option: IP_TO_NODE_TYPE - - function: CN_VPN_LOOKUP + - function: CN_ANONYMITY_LOOKUP lookup_fields: [ domain ] - output_fields: [ domain_vpn_service_name ] + output_fields: [ domain_node_type ] parameters: - kb_name: cn_vpn_learning_domain - option: DOMAIN_TO_VPN + kb_name: cn_ioc_darkweb + option: DOMAIN_TO_NODE_TYPE - function: CN_IOC_LOOKUP lookup_fields: [ server_ip ] @@ -289,69 +311,39 @@ processing_pipelines: kb_name: cn_ioc_malware option: DOMAIN_TO_MALWARE - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ domain ] output_fields: [ domain_tags ] parameters: - kb_name: cn_domain_tag_user_define + kb_name: cn_intelligence_indicator option: DOMAIN_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ app ] - output_fields: [ app_tags ] - parameters: - kb_name: cn_app_tag_user_define - option: APP_TO_TAG - - function: GENERATE_STRING_ARRAY lookup_fields: [ client_idc_renter,client_ip_tags ] output_fields: [ client_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ] output_fields: [ server_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] + lookup_fields: [ domain_node_type,domain_malware,domain_tags ] output_fields: [ domain_tags ] - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ client_ip_tags ] - output_fields: [ client_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ server_ip_tags ] - output_fields: [ server_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ domain_tags ] - output_fields: [ domain_tags ] - parameters: - prefix: domain. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ app_tags ] - output_fields: [ app_tags ] - parameters: - prefix: app. postprocessing_pipelines: remove_field_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl @@ -377,7 +369,7 @@ sinks: application: env: - name: etl_session_record_processed_kafka_to_cn_kafka + name: etl_session_record_kafka_to_cn_kafka shade.identifier: aes pipeline: object-reuse: true diff --git a/cyber_narrator/installation/groot_stream/grootstream.yaml b/cyber_narrator/installation/groot_stream/grootstream.yaml index 7e75185..e65976a 100644 --- a/cyber_narrator/installation/groot_stream/grootstream.yaml +++ b/cyber_narrator/installation/groot_stream/grootstream.yaml @@ -60,18 +60,6 @@ grootstream: files: - 12 - - name: cn_vpn_learning_ip - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base - files: - - 15 - - - name: cn_vpn_learning_domain - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base - files: - - 14 - - name: cn_ioc_darkweb fs_type: http fs_path: http://192.168.44.55:9999/v1/knowledge_base @@ -84,17 +72,11 @@ grootstream: files: - 7 - - name: cn_ip_tag_user_define + - name: cn_intelligence_indicator fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined - - - name: cn_domain_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined - - - name: cn_app_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 16 - name: cn_rule fs_type: http diff --git a/cyber_narrator/installation/groot_stream/udf.plugins b/cyber_narrator/installation/groot_stream/udf.plugins index 8ecce84..eb9fd57 100644 --- a/cyber_narrator/installation/groot_stream/udf.plugins +++ b/cyber_narrator/installation/groot_stream/udf.plugins @@ -14,6 +14,8 @@ com.geedgenetworks.core.udf.cn.IpZoneLookup com.geedgenetworks.core.udf.cn.VpnLookup com.geedgenetworks.core.udf.cn.AnonymityLookup com.geedgenetworks.core.udf.cn.IocLookup -com.geedgenetworks.core.udf.cn.UserDefineTagLookup com.geedgenetworks.core.udf.cn.FieldsMerge com.geedgenetworks.core.udf.cn.ArrayElementsPrepend +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.UnixTimestampConverter diff --git a/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties new file mode 100644 index 0000000..133582c --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties @@ -0,0 +1,12 @@ +# session-record-cn +#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence +# pre-metrics +cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric +# relation +cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric +# dns-metrics +cn.dns.pre.metric.class=com.zdjizhi.pre.dns.DnsPreMetric +# detection +cn.detection.indicator.class=com.zdjizhi.schedule.indicator.IndicatorSchedule +# location +cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties new file mode 100644 index 0000000..3d9a819 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties @@ -0,0 +1,41 @@ +# job name +stream.execution.job.name=cn_stream +# default parallelism +stream.execution.environment.parallelism={{ flink.cn_stream.parallelism }} +# kafka source parallelism +session.record.completed.parallelism={{ flink.cn_stream.parallelism }} +# session-record-cn sink parallelism +cn.record.parallelism={{ flink.cn_stream.parallelism }} +# pre-metrics sink parallelism +metric.output.parallelism={{ flink.cn_stream.parallelism }} +# dns-metrics sink parallelism +dns.metric.output.parallelism={{ flink.cn_stream.parallelism }} +# relation sink parallelism +metric.entity.relation.output.parallelism={{ flink.cn_stream.parallelism }} +# dynamic attribute sink parallelism +metric.dynamic.attribute.output.parallelism={{ flink.cn_stream.parallelism }} +# subscriber-app relation sink parallelism +metric.subscriber.app.relation.output.parallelism={{ flink.cn_stream.parallelism }} +# location sink parallelism +location.metric.output.parallelism={{ flink.cn_stream.parallelism }} +# kafka consumer +kafka.input.bootstrap.servers={{ kafka_source_servers }} +session.record.completed.topic=SESSION-RECORD-CN +session.record.completed.group.id=session-record-cn-stream +# kafka consumer sasl 0:off 1:on +input.sasl.jaas.config.flag=1 +# clickhouse +clickhouse.address={{ clickhouse_servers }} +clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA== +clickhouse.password=RY+0nruXpPqITsQ3ob4P7Qbd8W246+Pa +clickhouse.config.connect_timeout=30 +clickhouse.config.query_timeout=300 +# flink checkpoint 0:off 1:on +flink.enable.checkpoint.flag=0 +# api detection url +rule.full.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection +rule.inc.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection/increase +# gateway host +gateway.host={{ vrrp_instance.default.virtual_ipaddress }} +# warkmark +watermark.seconds=1 \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md index c2bba95..f195e3f 100644 --- a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md +++ b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md @@ -1,4 +1,4 @@ -groot-stream version > 1.3.0 +groot-stream version > 1.4.0 etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/etl_session_record_kafka_to_cn_kafka b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/etl_session_record_kafka_to_cn_kafka index 82b4855..96477e4 100644 --- a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/etl_session_record_kafka_to_cn_kafka +++ b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/etl_session_record_kafka_to_cn_kafka @@ -43,6 +43,17 @@ processing_pipelines: parameters: value_expression: "recv_time == null ? kafka_recv_time : recv_time" + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ cn_server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: EVAL + output_fields: [ server_domain ] + parameters: + value_expression: "server_domain == null ? cn_server_domain : server_domain" + - function: EVAL output_fields: [ domain ] parameters: diff --git a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/udf.plugins b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/udf.plugins index eb9fd57..d482afb 100644 --- a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/udf.plugins +++ b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/udf.plugins @@ -19,3 +19,4 @@ com.geedgenetworks.core.udf.cn.ArrayElementsPrepend com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup com.geedgenetworks.core.udf.SnowflakeId com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.Domain diff --git a/tsg_olap/installation/clickhouse/最新全量建表语句/system.sql b/tsg_olap/installation/clickhouse/最新全量建表语句/system.sql index 3ee0198..6bc7150 100644 --- a/tsg_olap/installation/clickhouse/最新全量建表语句/system.sql +++ b/tsg_olap/installation/clickhouse/最新全量建表语句/system.sql @@ -5,4 +5,5 @@ create table IF NOT EXISTS `system`.query_log_cluster ON CLUSTER ck_cluster as ` CREATE TABLE IF NOT EXISTS `system`.columns_cluster ON CLUSTER ck_cluster AS `system`.columns ENGINE=Distributed(ck_cluster,`system`,columns,rand()); CREATE TABLE IF NOT EXISTS `system`.processes_cluster ON CLUSTER ck_cluster AS `system`.processes ENGINE=Distributed(ck_cluster,`system`,processes,rand()); alter table system.query_log on cluster ck_cluster modify TTL event_date + INTERVAL 60 DAY; +create table IF not EXISTS system.distributed_ddl_queue_cluster ON CLUSTER ck_cluster as system.distributed_ddl_queue ENGINE =Distributed(ck_cluster,`system`,distributed_ddl_queue,rand()); diff --git a/tsg_olap/installation/clickhouse/最新全量建表语句/tsg_olap_clickhouse_ddl.sql b/tsg_olap/installation/clickhouse/最新全量建表语句/tsg_olap_clickhouse_ddl.sql index bfa7279..8d08920 100644 --- a/tsg_olap/installation/clickhouse/最新全量建表语句/tsg_olap_clickhouse_ddl.sql +++ b/tsg_olap/installation/clickhouse/最新全量建表语句/tsg_olap_clickhouse_ddl.sql @@ -3208,9 +3208,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String @@ -3229,9 +3229,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String diff --git a/tsg_olap/installation/clickhouse/最新全量建表语句/旧部署模式建表语句/tsg_olap_clickhouse_ddl.sql b/tsg_olap/installation/clickhouse/最新全量建表语句/旧部署模式建表语句/tsg_olap_clickhouse_ddl.sql index b29b84c..6fd5fc5 100644 --- a/tsg_olap/installation/clickhouse/最新全量建表语句/旧部署模式建表语句/tsg_olap_clickhouse_ddl.sql +++ b/tsg_olap/installation/clickhouse/最新全量建表语句/旧部署模式建表语句/tsg_olap_clickhouse_ddl.sql @@ -4282,9 +4282,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String @@ -4303,9 +4303,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String @@ -4325,9 +4325,9 @@ ENGINE = Distributed('ck_cluster', device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String diff --git a/tsg_olap/installation/flink/groot_stream/README.md b/tsg_olap/installation/flink/groot_stream/README.md index be570a7..c2ef994 100644 --- a/tsg_olap/installation/flink/groot_stream/README.md +++ b/tsg_olap/installation/flink/groot_stream/README.md @@ -1,26 +1,31 @@ -## session_record.yaml.template -- etl_session_record_kafka_to_ndc_kafka (A-DT) // 多数中心部署:分中心Data Transporter 预处理后,集中汇聚至国家中心(NDC) - - Topology: kafka_source -> etl_processor -> kafka_sink - - Data Flow: SESSION-RECORD -> SESSION-RECORD-PROCESSED -- session_record_processed_kafka_to_clickhouse(A-NDC) // 多数中心部署:国家中心侧加载会话日志写入ClickHouse - - Topology: kafka_source -> clickhouse_sink - - Data Flow: SESSION-RECORD-PROCESSED -> session_record_local -- etl_session_record_kafka_to_clickhouse (B) // 集中部署: 摄入会话日志,预处理后写入ClickHouse - - Topology: kafka_source -> etl_processor -> clickhouse_sink - - Data Flow: SESSION-RECORD -> session_record_local +# 配置模版举例 -## realtime_log_streaming_cn_session_record.yaml.template +## session_record.yaml.j2 (会话日志ETL场景) + +- 多数中心部署场景: 分中心Data Transporter预处理后,集中汇聚至国家中心(NDC) + - etl_session_record_kafka_to_ndc_kafka (A-DT) + - Topology: kafka_source -> etl_processor -> kafka_sink + - Data Flow: SESSION-RECORD -> SESSION-RECORD-PROCESSED +- 多数中心部署场景:国家中心侧加载会话日志写入ClickHouse + - session_record_processed_kafka_to_clickhouse(A-NDC) + - Topology: kafka_source -> clickhouse_sink + - Data Flow: SESSION-RECORD-PROCESSED -> session_record_local +- 集中部署场景:摄入会话日志,预处理后写入ClickHouse + - etl_session_record_kafka_to_clickhouse (B) + - Topology: kafka_source -> etl_processor -> clickhouse_sink + - Data Flow: SESSION-RECORD -> session_record_local + +## data_transporter.yaml.j2 (数据回传场景) + +- troubleshooting_file_stream_kafka_to_ndc_kafka + - Topology: kafka_source -> kafka_sink (format:raw) + - Data Flow: TROUBLESHOOTING-FILE-STREAM-RECORD -> TROUBLESHOOTING-FILE-STREAM-RECORD + +## realtime_log_streaming_cn_session_record.yaml.template (向其它厂商/第三方推送场景) `install_cn_udf.sh安装CN UDFs;grootstream.yaml定义CN知识库` - etl_session_record_kafka_to_cn_kafka - Topology: kafka_source -> etl_processor -> post_output_field_processor -> kafka_sink - Data Flow: SESSION-RECORD(SESSION-RECORD-PROCESSED) -> SESSION-RECORD-CN - -## data_transporter.yaml.template - -- troubleshooting_file_stream_kafka_to_ndc_kafka - - - Topology: kafka_source -> kafka_sink (format:raw) - - Data Flow: TROUBLESHOOTING-FILE-STREAM-RECORD -> TROUBLESHOOTING-FILE-STREAM-RECORD diff --git a/tsg_olap/installation/flink/groot_stream/grootstream.yaml b/tsg_olap/installation/flink/groot_stream/grootstream.yaml deleted file mode 100644 index a94e7e3..0000000 --- a/tsg_olap/installation/flink/groot_stream/grootstream.yaml +++ /dev/null @@ -1,18 +0,0 @@ -grootstream: - knowledge_base: -# - name: tsg_ip_asn -# fs_type: http -# fs_path: http://192.168.44.12:9999/v1/knowledge_base -# files: -# - f9f6bc91-2142-4673-8249-e097c00fe1ea - - - name: tsg_ip_asn - fs_type: local - fs_path: /data/hdd/olap/flink/topology/data/ - files: - - asn_builtin.mmdb - properties: - hos.path: http://192.168.44.12:9098/hos - hos.bucket.name.traffic_file: traffic_file_bucket - hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket - scheduler.knowledge_base.update.interval.minutes: 5 diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka index 7ab5423..1760cf3 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka @@ -73,11 +73,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka index b852127..07b7e74 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka @@ -73,12 +73,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka index 9e7c869..bbc0353 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka @@ -77,12 +77,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse index da87056..da47689 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse @@ -73,11 +73,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] diff --git a/tsg_olap/installation/flink/groot_stream/realtime_log_streaming_cn_session_record.yaml.template b/tsg_olap/installation/flink/groot_stream/realtime_log_streaming_cn_session_record.yaml.template index 66066e1..2a011a2 100644 --- a/tsg_olap/installation/flink/groot_stream/realtime_log_streaming_cn_session_record.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/realtime_log_streaming_cn_session_record.yaml.template @@ -383,5 +383,3 @@ application: downstream: [ kafka_sink ] - name: kafka_sink downstream: [ ] - - diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/dos_event_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/dos_event_kafka_to_clickhouse index b87db67..49a3d1a 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/dos_event_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/dos_event_kafka_to_clickhouse @@ -3,7 +3,7 @@ sources: type: kafka properties: topic: DOS-EVENT - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -27,7 +27,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.dos_event_local batch.size: 100000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_datapath_telemetry_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_datapath_telemetry_record_kafka_to_clickhouse index 14655a6..8ffbda3 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_datapath_telemetry_record_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_datapath_telemetry_record_kafka_to_clickhouse @@ -3,7 +3,7 @@ sources: type: kafka properties: topic: DATAPATH-TELEMETRY-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -46,7 +46,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.datapath_telemetry_record_local batch.size: 5000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse index 39ab825..19e4560 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse @@ -7,7 +7,7 @@ sources: # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: topic: PROXY-EVENT - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -71,12 +71,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: @@ -123,7 +117,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.proxy_event_local batch.size: 100000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse index 643fa48..ada2632 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse @@ -7,7 +7,7 @@ sources: # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: topic: SESSION-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -71,12 +71,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: @@ -123,7 +117,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.session_record_local batch.size: 100000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse new file mode 100644 index 0000000..016650c --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse @@ -0,0 +1,92 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + #rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse index 4d28714..d4dbd87 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse @@ -7,7 +7,7 @@ sources: # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: topic: TRANSACTION-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -71,12 +71,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: @@ -123,7 +117,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.transaction_record_local batch.size: 100000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse index 90d3179..d798e94 100644 --- a/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse @@ -7,7 +7,7 @@ sources: # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: topic: VOIP-CONVERSATION-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -71,12 +71,6 @@ processing_pipelines: parameters: value_expression: recv_time - - function: DOMAIN - lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] - output_fields: [server_domain] - parameters: - option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: BASE64_DECODE_TO_STRING output_fields: [mail_subject] parameters: @@ -123,7 +117,7 @@ sinks: clickhouse_sink: type: clickhouse properties: - host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001 + host: "{{ clickhouse_servers }}" table: tsg_galaxy_v3.voip_record_local batch.size: 100000 batch.interval: 30s diff --git a/tsg_olap/installation/flink/groot_stream/data_transporter.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/data_transporter.yaml.j2 similarity index 76% rename from tsg_olap/installation/flink/groot_stream/data_transporter.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/data_transporter.yaml.j2 index a645031..e5c6273 100644 --- a/tsg_olap/installation/flink/groot_stream/data_transporter.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/data_transporter.yaml.j2 @@ -3,7 +3,7 @@ sources: type: kafka properties: topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -19,7 +19,7 @@ sinks: type: kafka properties: topic: {{ kafka_sink_topic }} - kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } } + kafka.bootstrap.servers: { { kafka_sink_servers } } kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -32,6 +32,16 @@ sinks: kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 format: raw + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + application: env: name: {{ job_name }} @@ -39,9 +49,6 @@ application: pipeline: object-reuse: true topology: - - name: kafka_source - downstream: [kafka_sink] - - name: kafka_sink - downstream: [] + {{ topology }} diff --git a/tsg_olap/installation/flink/groot_stream/etl_datapath_telemetry_record.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/datapath_telemetry_record.yaml.j2 similarity index 65% rename from tsg_olap/installation/flink/groot_stream/etl_datapath_telemetry_record.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/datapath_telemetry_record.yaml.j2 index 0dc3255..58da17c 100644 --- a/tsg_olap/installation/flink/groot_stream/etl_datapath_telemetry_record.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/datapath_telemetry_record.yaml.j2 @@ -2,8 +2,8 @@ sources: kafka_source: type: kafka properties: - topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -34,6 +34,25 @@ processing_pipelines: value_field: packet sinks: + kafka_sink: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + json.ignore.parse.errors: false + log.failures.only: true + clickhouse_sink: type: clickhouse properties: @@ -53,11 +72,6 @@ application: pipeline: object-reuse: true topology: - - name: kafka_source - downstream: [etl_processor] - - name: etl_processor - downstream: [clickhouse_sink] - - name: clickhouse_sink - downstream: [] + {{ topology }} diff --git a/tsg_olap/installation/flink/groot_stream/templates/dos_event_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/templates/dos_event_kafka_to_clickhouse new file mode 100644 index 0000000..49a3d1a --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/templates/dos_event_kafka_to_clickhouse @@ -0,0 +1,49 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DOS-EVENT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: dos_event_kafka_to_clickhouse-20231221 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.dos_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + env: + name: dos_event_kafka_to_clickhouse + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/proxy_event.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/proxy_event.yaml.j2 similarity index 92% rename from tsg_olap/installation/flink/groot_stream/proxy_event.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/proxy_event.yaml.j2 index fcb18a1..cee0548 100644 --- a/tsg_olap/installation/flink/groot_stream/proxy_event.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/proxy_event.yaml.j2 @@ -2,8 +2,8 @@ sources: kafka_source: type: kafka properties: - topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -107,8 +107,8 @@ sinks: kafka_sink: type: kafka properties: - topic: {{ kafka_sink_topic }} - kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } } + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -142,11 +142,5 @@ application: pipeline: object-reuse: true topology: - - name: kafka_source - downstream: [etl_processor] - - name: etl_processor - downstream: [clickhouse_sink] - - name: clickhouse_sink - downstream: [] - + {{ topology }} diff --git a/tsg_olap/installation/flink/groot_stream/session_record.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/session_record.yaml.j2 similarity index 92% rename from tsg_olap/installation/flink/groot_stream/session_record.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/session_record.yaml.j2 index 49e02a1..8938cd2 100644 --- a/tsg_olap/installation/flink/groot_stream/session_record.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/session_record.yaml.j2 @@ -2,8 +2,8 @@ sources: kafka_source: type: kafka properties: - topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -107,8 +107,8 @@ sinks: kafka_sink: type: kafka properties: - topic: {{ kafka_sink_topic }} - kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } } + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -141,12 +141,4 @@ application: shade.identifier: aes pipeline: object-reuse: true - topology: - - name: kafka_source - downstream: [etl_processor] - - name: etl_processor - downstream: [clickhouse_sink] - - name: clickhouse_sink - downstream: [] - - + {{ topology }} \ No newline at end of file diff --git a/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 b/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 new file mode 100644 index 0000000..0a048a0 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 @@ -0,0 +1,106 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + #rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + + diff --git a/tsg_olap/installation/flink/groot_stream/transaction_record.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/transaction_record.yaml.j2 similarity index 92% rename from tsg_olap/installation/flink/groot_stream/transaction_record.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/transaction_record.yaml.j2 index 60dcfe2..9c8fa4b 100644 --- a/tsg_olap/installation/flink/groot_stream/transaction_record.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/transaction_record.yaml.j2 @@ -2,8 +2,8 @@ sources: kafka_source: type: kafka properties: - topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -107,8 +107,8 @@ sinks: kafka_sink: type: kafka properties: - topic: {{ kafka_sink_topic }} - kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } } + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -141,12 +141,4 @@ application: shade.identifier: aes pipeline: object-reuse: true - topology: - - name: kafka_source - downstream: [etl_processor] - - name: etl_processor - downstream: [clickhouse_sink] - - name: clickhouse_sink - downstream: [] - - + {{ topology }} diff --git a/tsg_olap/installation/flink/groot_stream/voip_record.yaml.template b/tsg_olap/installation/flink/groot_stream/templates/voip_record.yaml.j2 similarity index 92% rename from tsg_olap/installation/flink/groot_stream/voip_record.yaml.template rename to tsg_olap/installation/flink/groot_stream/templates/voip_record.yaml.j2 index 671d1c0..1a561f2 100644 --- a/tsg_olap/installation/flink/groot_stream/voip_record.yaml.template +++ b/tsg_olap/installation/flink/groot_stream/templates/voip_record.yaml.j2 @@ -2,8 +2,8 @@ sources: kafka_source: type: kafka properties: - topic: {{ kafka_source_topic }} - kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }} + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -107,8 +107,8 @@ sinks: kafka_sink: type: kafka properties: - topic: {{ kafka_sink_topic }} - kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } } + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -141,12 +141,4 @@ application: shade.identifier: aes pipeline: object-reuse: true - topology: - - name: kafka_source - downstream: [etl_processor] - - name: etl_processor - downstream: [clickhouse_sink] - - name: clickhouse_sink - downstream: [] - - + {{ topology }} \ No newline at end of file diff --git a/tsg_olap/installation/flink/groot_stream/udf.plugins b/tsg_olap/installation/flink/groot_stream/udf.plugins deleted file mode 100644 index 8d7a1d6..0000000 --- a/tsg_olap/installation/flink/groot_stream/udf.plugins +++ /dev/null @@ -1,16 +0,0 @@ -com.geedgenetworks.core.udf.AsnLookup -com.geedgenetworks.core.udf.CurrentUnixTimestamp -com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.Domain -com.geedgenetworks.core.udf.Drop -com.geedgenetworks.core.udf.EncodeBase64 -com.geedgenetworks.core.udf.Eval -com.geedgenetworks.core.udf.FromUnixTimestamp -com.geedgenetworks.core.udf.GenerateStringArray -com.geedgenetworks.core.udf.GeoIpLookup -com.geedgenetworks.core.udf.JsonExtract -com.geedgenetworks.core.udf.PathCombine -com.geedgenetworks.core.udf.Rename -com.geedgenetworks.core.udf.SnowflakeId -com.geedgenetworks.core.udf.StringJoiner -com.geedgenetworks.core.udf.UnixTimestampConverter diff --git a/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_24.06.sql b/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_24.06.sql index bfa7279..bc004dd 100644 --- a/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_24.06.sql +++ b/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_24.06.sql @@ -3208,9 +3208,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String @@ -3229,9 +3229,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck device_group String, traffic_link_id Int32, source_ip String, - source_port Int32, + source_port Nullable(Int32), destination_ip String, - destination_port Int32, + destination_port Nullable(Int32), packet String, packet_length Int32, measurements String @@ -3240,4 +3240,101 @@ ENGINE = Distributed('ck_cluster', 'tsg_galaxy_v3', 'datapath_telemetry_record_local', rand()); - \ No newline at end of file + +CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric_local on cluster ck_cluster +( + log_id UInt64, + recv_time Int64, + vsys_id Int64, + device_id String, + device_group String, + data_center String, + direction String, + ip_protocol String, + client_ip String, + server_ip String, + internal_ip String, + external_ip String, + client_country String, + server_country String, + client_asn Nullable(Int64), + server_asn Nullable(Int64), + server_fqdn String, + server_domain String, + app String, + app_category String, + c2s_ttl Nullable(Int32), + s2c_ttl Nullable(Int32), + c2s_link_id Nullable(Int32), + s2c_link_id Nullable(Int32), + sessions Int64, + bytes Int64, + sent_bytes Int64, + received_bytes Int64, + pkts Int64, + sent_pkts Int64, + received_pkts Int64, + asymmetric_c2s_flows Int64, + asymmetric_s2c_flows Int64, + c2s_fragments Int64, + s2c_fragments Int64, + c2s_tcp_lost_bytes Int64, + s2c_tcp_lost_bytes Int64, + c2s_tcp_retransmitted_pkts Int64, + s2c_tcp_retransmitted_pkts Int64 +) +ENGINE = MergeTree +PARTITION BY toYYYYMMDD(toDate(recv_time)) +ORDER BY (vsys_id, + direction, + ip_protocol, + app, + client_ip, + recv_time); + + CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric on cluster ck_cluster +( + log_id UInt64, + recv_time Int64, + vsys_id Int64, + device_id String, + device_group String, + data_center String, + direction String, + ip_protocol String, + client_ip String, + server_ip String, + internal_ip String, + external_ip String, + client_country String, + server_country String, + client_asn Nullable(Int64), + server_asn Nullable(Int64), + server_fqdn String, + server_domain String, + app String, + app_category String, + c2s_ttl Nullable(Int32), + s2c_ttl Nullable(Int32), + c2s_link_id Nullable(Int32), + s2c_link_id Nullable(Int32), + sessions Int64, + bytes Int64, + sent_bytes Int64, + received_bytes Int64, + pkts Int64, + sent_pkts Int64, + received_pkts Int64, + asymmetric_c2s_flows Int64, + asymmetric_s2c_flows Int64, + c2s_fragments Int64, + s2c_fragments Int64, + c2s_tcp_lost_bytes Int64, + s2c_tcp_lost_bytes Int64, + c2s_tcp_retransmitted_pkts Int64, + s2c_tcp_retransmitted_pkts Int64 +) +ENGINE = Distributed('ck_cluster', + 'tsg_galaxy_v3', + 'traffic_sketch_metric_local', + rand()); \ No newline at end of file diff --git a/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_check_24.06.sql b/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_check_24.06.sql index ad561df..275b5fa 100644 --- a/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_check_24.06.sql +++ b/tsg_olap/upgrade/TSG-24.06/clickhouse/tsg_olap_clickhouse_ddl_check_24.06.sql @@ -16,6 +16,7 @@ SELECT recv_time, log_id, decoded_as, session_id, start_timestamp_ms, end_timest FROM tsg_galaxy_v3.voip_record where recv_time >= toUnixTimestamp('2030-01-01 00:00:00') AND recv_time = toUnixTimestamp('2030-01-01 00:00:00') AND recv_time = toUnixTimestamp('2030-01-01 00:00:00') AND recv_time ) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_transaction_record_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + kafka_sink: + type : kafka + properties: + topic: TRANSACTION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + +application: + + env: # [object] Environment Variables + name: etl_transaction_record_kafka_to_ndc_kafka # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse new file mode 100644 index 0000000..da47689 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse @@ -0,0 +1,118 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_voip_record_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + + +application: + + env: # [object] Environment Variables + name: etl_voip_record_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse new file mode 100644 index 0000000..19e4560 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_proxy_event_kafka_to_clickhouse @@ -0,0 +1,142 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: etl_proxy_event_kafka_to_clickhouse-20231221 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.proxy_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_proxy_event_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse new file mode 100644 index 0000000..ada2632 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_session_record_kafka_to_clickhouse @@ -0,0 +1,142 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: etl_session_record_kafka_to_clickhouse-20230125 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.session_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_session_record_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse new file mode 100644 index 0000000..016650c --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse @@ -0,0 +1,92 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + #rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse new file mode 100644 index 0000000..d4dbd87 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_transaction_record_kafka_to_clickhouse @@ -0,0 +1,140 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: etl_transaction_record_kafka_to_clickhouse-20240308 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.transaction_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_transaction_record_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse new file mode 100644 index 0000000..d798e94 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/single-cluster-examples/etl_voip_record_kafka_to_clickhouse @@ -0,0 +1,142 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: etl_voip_record_kafka_to_clickhouse-20231221 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_voip_record_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/data_transporter.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/data_transporter.yaml.j2 new file mode 100644 index 0000000..e5c6273 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/data_transporter.yaml.j2 @@ -0,0 +1,54 @@ +sources: + kafka_source: + type: kafka + properties: + topic: {{ kafka_source_topic }} + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: raw + +sinks: + kafka_sink: + type: kafka + properties: + topic: {{ kafka_sink_topic }} + kafka.bootstrap.servers: { { kafka_sink_servers } } + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + topology: + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/datapath_telemetry_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/datapath_telemetry_record.yaml.j2 new file mode 100644 index 0000000..58da17c --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/datapath_telemetry_record.yaml.j2 @@ -0,0 +1,77 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: msgpack + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + parameters: + data_center_id_num: {{ data_center_id_num }} + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: BASE64_ENCODE_TO_STRING + output_fields: [ packet ] + parameters: + value_field: packet + +sinks: + kafka_sink: + type: kafka + properties: + topic: DATAPATH-TELEMETRY-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.datapath_telemetry_record_local + batch.size: 5000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + topology: + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/dos_event_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/dos_event_kafka_to_clickhouse new file mode 100644 index 0000000..49a3d1a --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/dos_event_kafka_to_clickhouse @@ -0,0 +1,49 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DOS-EVENT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: dos_event_kafka_to_clickhouse-20231221 + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: json + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.dos_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + env: + name: dos_event_kafka_to_clickhouse + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/proxy_event.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/proxy_event.yaml.j2 new file mode 100644 index 0000000..cee0548 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/proxy_event.yaml.j2 @@ -0,0 +1,146 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: {{ data_center_id_num }} + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.proxy_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + topology: + {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/session_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/session_record.yaml.j2 new file mode 100644 index 0000000..8938cd2 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/session_record.yaml.j2 @@ -0,0 +1,144 @@ +sources: + kafka_source: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: {{ data_center_id_num }} + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.session_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} \ No newline at end of file diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/traffic_sketch_metric.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/traffic_sketch_metric.yaml.j2 new file mode 100644 index 0000000..0a048a0 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/traffic_sketch_metric.yaml.j2 @@ -0,0 +1,106 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_traffic_sketch_metric + kafka.auto.offset.reset: latest + kafka.compression.type: none + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + + - function: FLATTEN + lookup_fields: [ fields,tags ] + output_fields: [ ] + parameters: + #prefix: "" + depth: 3 + # delimiter: "." + + - function: RENAME + lookup_fields: [ '' ] + output_fields: [ '' ] + filter: + parameters: + # parent_fields: [tags] + #rename_fields: + # tags: tags + rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; + + - function: EVAL + output_fields: [ internal_ip ] + parameters: + value_expression: 'direction=Outbound? client_ip : server_ip' + - function: EVAL + output_fields: [ external_ip ] + parameters: + value_expression: 'direction=Outbound? server_ip : client_ip' + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] + parameters: + precision: seconds + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ log_id ] + filter: + parameters: + data_center_id_num: 1 + + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRAFFIC-SKETCH-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_servers }} + table: tsg_galaxy_v3.traffic_sketch_metric_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + + +application: + + env: # [object] Environment Variables + name: etl_traffic_sketch_metric # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + {{ topology }} + + diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/transaction_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/transaction_record.yaml.j2 new file mode 100644 index 0000000..9c8fa4b --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/transaction_record.yaml.j2 @@ -0,0 +1,144 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: {{ data_center_id_num }} + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: TRANSACTION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.transaction_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} diff --git a/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/voip_record.yaml.j2 b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/voip_record.yaml.j2 new file mode 100644 index 0000000..1a561f2 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.06/groot_stream/templates/voip_record.yaml.j2 @@ -0,0 +1,144 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: {{ kafka_source_group_id }} + kafka.auto.offset.reset: latest + format: json + json.ignore.parse.errors: false + +processing_pipelines: + etl_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + functions: + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: {{ data_center_id_num }} + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [ processing_time ] + parameters: + precision: seconds + +sinks: + kafka_sink: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + json.ignore.parse.errors: false + log.failures.only: true + + clickhouse_sink: + type: clickhouse + properties: + host: {{ clickhouse_sink_host }} + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: {{ job_name }} + shade.identifier: aes + pipeline: + object-reuse: true + {{ topology }} \ No newline at end of file