Merge remote-tracking branch 'origin/master'

This commit is contained in:
wangchengcheng
2024-07-02 15:28:02 +08:00
55 changed files with 2767 additions and 267 deletions

View File

@@ -1,2 +1,4 @@
v1.2.4 (2024-04-08)
https://git.mesalab.cn/galaxy/platform/groot-stream/-/releases/v1.2.4
groot-stream version > 1.4.0
etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED

View File

@@ -3,7 +3,7 @@ sources:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD-PROCESSED
topic: {{ tsg_olap_kafka_session_record_or_session_record_processed_topic }} # SESSION-RECORD/SESSION-RECORD-PROCESSED
kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
@@ -11,7 +11,7 @@ sources:
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka
kafka.group.id: etl_session_record_kafka_to_cn_kafka
kafka.auto.offset.reset: latest
format: json
@@ -21,6 +21,28 @@ processing_pipelines:
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ cn_log_id ]
parameters:
data_center_id_num: 1
- function: EVAL
output_fields: [ log_id ]
parameters:
value_expression: "log_id == null ? cn_log_id : log_id"
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ __timestamp ]
output_fields: [ kafka_recv_time ]
parameters:
precision: seconds
- function: EVAL
output_fields: [ recv_time ]
parameters:
value_expression: "recv_time == null ? kafka_recv_time : recv_time"
- function: EVAL
output_fields: [ domain ]
parameters:
@@ -261,19 +283,19 @@ processing_pipelines:
parameters:
value_expression: "client_zone == 'external' ? sessions : external_query_num"
- function: CN_VPN_LOOKUP
- function: CN_ANONYMITY_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_vpn_service_name ]
output_fields: [ server_node_type ]
parameters:
kb_name: cn_vpn_learning_ip
option: IP_TO_VPN
kb_name: cn_ioc_darkweb
option: IP_TO_NODE_TYPE
- function: CN_VPN_LOOKUP
- function: CN_ANONYMITY_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_vpn_service_name ]
output_fields: [ domain_node_type ]
parameters:
kb_name: cn_vpn_learning_domain
option: DOMAIN_TO_VPN
kb_name: cn_ioc_darkweb
option: DOMAIN_TO_NODE_TYPE
- function: CN_IOC_LOOKUP
lookup_fields: [ server_ip ]
@@ -289,69 +311,39 @@ processing_pipelines:
kb_name: cn_ioc_malware
option: DOMAIN_TO_MALWARE
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ client_ip_tags ]
parameters:
kb_name: cn_ip_tag_user_define
kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_ip_tags ]
parameters:
kb_name: cn_ip_tag_user_define
kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_tags ]
parameters:
kb_name: cn_domain_tag_user_define
kb_name: cn_intelligence_indicator
option: DOMAIN_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
lookup_fields: [ app ]
output_fields: [ app_tags ]
parameters:
kb_name: cn_app_tag_user_define
option: APP_TO_TAG
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_idc_renter,client_ip_tags ]
output_fields: [ client_ip_tags ]
- function: GENERATE_STRING_ARRAY
lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ]
lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ]
output_fields: [ server_ip_tags ]
- function: GENERATE_STRING_ARRAY
lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ]
lookup_fields: [ domain_node_type,domain_malware,domain_tags ]
output_fields: [ domain_tags ]
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ client_ip_tags ]
output_fields: [ client_ip_tags ]
parameters:
prefix: ip.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ server_ip_tags ]
output_fields: [ server_ip_tags ]
parameters:
prefix: ip.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ domain_tags ]
output_fields: [ domain_tags ]
parameters:
prefix: domain.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ app_tags ]
output_fields: [ app_tags ]
parameters:
prefix: app.
postprocessing_pipelines:
remove_field_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
@@ -377,7 +369,7 @@ sinks:
application:
env:
name: etl_session_record_processed_kafka_to_cn_kafka
name: etl_session_record_kafka_to_cn_kafka
shade.identifier: aes
pipeline:
object-reuse: true

View File

@@ -60,18 +60,6 @@ grootstream:
files:
- 12
- name: cn_vpn_learning_ip
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 15
- name: cn_vpn_learning_domain
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 14
- name: cn_ioc_darkweb
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
@@ -84,17 +72,11 @@ grootstream:
files:
- 7
- name: cn_ip_tag_user_define
- name: cn_intelligence_indicator
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined
- name: cn_domain_tag_user_define
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined
- name: cn_app_tag_user_define
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 16
- name: cn_rule
fs_type: http

View File

@@ -14,6 +14,8 @@ com.geedgenetworks.core.udf.cn.IpZoneLookup
com.geedgenetworks.core.udf.cn.VpnLookup
com.geedgenetworks.core.udf.cn.AnonymityLookup
com.geedgenetworks.core.udf.cn.IocLookup
com.geedgenetworks.core.udf.cn.UserDefineTagLookup
com.geedgenetworks.core.udf.cn.FieldsMerge
com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.UnixTimestampConverter

View File

@@ -0,0 +1,12 @@
# session-record-cn
#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
# pre-metrics
cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric
# relation
cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric
# dns-metrics
cn.dns.pre.metric.class=com.zdjizhi.pre.dns.DnsPreMetric
# detection
cn.detection.indicator.class=com.zdjizhi.schedule.indicator.IndicatorSchedule
# location
cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric

View File

@@ -0,0 +1,41 @@
# job name
stream.execution.job.name=cn_stream
# default parallelism
stream.execution.environment.parallelism={{ flink.cn_stream.parallelism }}
# kafka source parallelism
session.record.completed.parallelism={{ flink.cn_stream.parallelism }}
# session-record-cn sink parallelism
cn.record.parallelism={{ flink.cn_stream.parallelism }}
# pre-metrics sink parallelism
metric.output.parallelism={{ flink.cn_stream.parallelism }}
# dns-metrics sink parallelism
dns.metric.output.parallelism={{ flink.cn_stream.parallelism }}
# relation sink parallelism
metric.entity.relation.output.parallelism={{ flink.cn_stream.parallelism }}
# dynamic attribute sink parallelism
metric.dynamic.attribute.output.parallelism={{ flink.cn_stream.parallelism }}
# subscriber-app relation sink parallelism
metric.subscriber.app.relation.output.parallelism={{ flink.cn_stream.parallelism }}
# location sink parallelism
location.metric.output.parallelism={{ flink.cn_stream.parallelism }}
# kafka consumer
kafka.input.bootstrap.servers={{ kafka_source_servers }}
session.record.completed.topic=SESSION-RECORD-CN
session.record.completed.group.id=session-record-cn-stream
# kafka consumer sasl 0:off 1:on
input.sasl.jaas.config.flag=1
# clickhouse
clickhouse.address={{ clickhouse_servers }}
clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA==
clickhouse.password=RY+0nruXpPqITsQ3ob4P7Qbd8W246+Pa
clickhouse.config.connect_timeout=30
clickhouse.config.query_timeout=300
# flink checkpoint 0:off 1:on
flink.enable.checkpoint.flag=0
# api detection url
rule.full.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection
rule.inc.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection/increase
# gateway host
gateway.host={{ vrrp_instance.default.virtual_ipaddress }}
# warkmark
watermark.seconds=1

View File

@@ -1,4 +1,4 @@
groot-stream version > 1.3.0
groot-stream version > 1.4.0
etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED

View File

@@ -43,6 +43,17 @@ processing_pipelines:
parameters:
value_expression: "recv_time == null ? kafka_recv_time : recv_time"
- function: DOMAIN
lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ]
output_fields: [ cn_server_domain ]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: EVAL
output_fields: [ server_domain ]
parameters:
value_expression: "server_domain == null ? cn_server_domain : server_domain"
- function: EVAL
output_fields: [ domain ]
parameters:

View File

@@ -19,3 +19,4 @@ com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.UnixTimestampConverter
com.geedgenetworks.core.udf.Domain

View File

@@ -5,4 +5,5 @@ create table IF NOT EXISTS `system`.query_log_cluster ON CLUSTER ck_cluster as `
CREATE TABLE IF NOT EXISTS `system`.columns_cluster ON CLUSTER ck_cluster AS `system`.columns ENGINE=Distributed(ck_cluster,`system`,columns,rand());
CREATE TABLE IF NOT EXISTS `system`.processes_cluster ON CLUSTER ck_cluster AS `system`.processes ENGINE=Distributed(ck_cluster,`system`,processes,rand());
alter table system.query_log on cluster ck_cluster modify TTL event_date + INTERVAL 60 DAY;
create table IF not EXISTS system.distributed_ddl_queue_cluster ON CLUSTER ck_cluster as system.distributed_ddl_queue ENGINE =Distributed(ck_cluster,`system`,distributed_ddl_queue,rand());

View File

@@ -3208,9 +3208,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String
@@ -3229,9 +3229,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String

View File

@@ -4282,9 +4282,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String
@@ -4303,9 +4303,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String
@@ -4325,9 +4325,9 @@ ENGINE = Distributed('ck_cluster',
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String

View File

@@ -1,26 +1,31 @@
## session_record.yaml.template
- etl_session_record_kafka_to_ndc_kafka (A-DT) // 多数中心部署分中心Data Transporter 预处理后集中汇聚至国家中心NDC
- Topology: kafka_source -> etl_processor -> kafka_sink
- Data Flow: SESSION-RECORD -> SESSION-RECORD-PROCESSED
- session_record_processed_kafka_to_clickhouse(A-NDC) // 多数中心部署国家中心侧加载会话日志写入ClickHouse
- Topology: kafka_source -> clickhouse_sink
- Data Flow: SESSION-RECORD-PROCESSED -> session_record_local
- etl_session_record_kafka_to_clickhouse (B) // 集中部署: 摄入会话日志预处理后写入ClickHouse
- Topology: kafka_source -> etl_processor -> clickhouse_sink
- Data Flow: SESSION-RECORD -> session_record_local
# 配置模版举例
## realtime_log_streaming_cn_session_record.yaml.template
## session_record.yaml.j2 会话日志ETL场景
- 多数中心部署场景: 分中心Data Transporter预处理后集中汇聚至国家中心NDC
- etl_session_record_kafka_to_ndc_kafka (A-DT)
- Topology: kafka_source -> etl_processor -> kafka_sink
- Data Flow: SESSION-RECORD -> SESSION-RECORD-PROCESSED
- 多数中心部署场景国家中心侧加载会话日志写入ClickHouse
- session_record_processed_kafka_to_clickhouse(A-NDC)
- Topology: kafka_source -> clickhouse_sink
- Data Flow: SESSION-RECORD-PROCESSED -> session_record_local
- 集中部署场景摄入会话日志预处理后写入ClickHouse
- etl_session_record_kafka_to_clickhouse (B)
- Topology: kafka_source -> etl_processor -> clickhouse_sink
- Data Flow: SESSION-RECORD -> session_record_local
## data_transporter.yaml.j2 (数据回传场景)
- troubleshooting_file_stream_kafka_to_ndc_kafka
- Topology: kafka_source -> kafka_sink (format:raw)
- Data Flow: TROUBLESHOOTING-FILE-STREAM-RECORD -> TROUBLESHOOTING-FILE-STREAM-RECORD
## realtime_log_streaming_cn_session_record.yaml.template (向其它厂商/第三方推送场景)
`install_cn_udf.sh安装CN UDFsgrootstream.yaml定义CN知识库`
- etl_session_record_kafka_to_cn_kafka
- Topology: kafka_source -> etl_processor -> post_output_field_processor -> kafka_sink
- Data Flow: SESSION-RECORD(SESSION-RECORD-PROCESSED) -> SESSION-RECORD-CN
## data_transporter.yaml.template
- troubleshooting_file_stream_kafka_to_ndc_kafka
- Topology: kafka_source -> kafka_sink (format:raw)
- Data Flow: TROUBLESHOOTING-FILE-STREAM-RECORD -> TROUBLESHOOTING-FILE-STREAM-RECORD

View File

@@ -1,18 +0,0 @@
grootstream:
knowledge_base:
# - name: tsg_ip_asn
# fs_type: http
# fs_path: http://192.168.44.12:9999/v1/knowledge_base
# files:
# - f9f6bc91-2142-4673-8249-e097c00fe1ea
- name: tsg_ip_asn
fs_type: local
fs_path: /data/hdd/olap/flink/topology/data/
files:
- asn_builtin.mmdb
properties:
hos.path: http://192.168.44.12:9098/hos
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5

View File

@@ -73,11 +73,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]

View File

@@ -73,12 +73,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:

View File

@@ -77,12 +77,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:

View File

@@ -73,11 +73,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]

View File

@@ -383,5 +383,3 @@ application:
downstream: [ kafka_sink ]
- name: kafka_sink
downstream: [ ]

View File

@@ -3,7 +3,7 @@ sources:
type: kafka
properties:
topic: DOS-EVENT
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -27,7 +27,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.dos_event_local
batch.size: 100000
batch.interval: 30s

View File

@@ -3,7 +3,7 @@ sources:
type: kafka
properties:
topic: DATAPATH-TELEMETRY-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -46,7 +46,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.datapath_telemetry_record_local
batch.size: 5000
batch.interval: 30s

View File

@@ -7,7 +7,7 @@ sources:
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: PROXY-EVENT
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -71,12 +71,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
@@ -123,7 +117,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.proxy_event_local
batch.size: 100000
batch.interval: 30s

View File

@@ -7,7 +7,7 @@ sources:
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: SESSION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -71,12 +71,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
@@ -123,7 +117,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.session_record_local
batch.size: 100000
batch.interval: 30s

View File

@@ -0,0 +1,92 @@
sources:
kafka_source:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_traffic_sketch_metric
kafka.auto.offset.reset: latest
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: FLATTEN
lookup_fields: [ fields,tags ]
output_fields: [ ]
parameters:
#prefix: ""
depth: 3
# delimiter: "."
- function: RENAME
lookup_fields: [ '' ]
output_fields: [ '' ]
filter:
parameters:
# parent_fields: [tags]
#rename_fields:
# tags: tags
rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'direction=Outbound? client_ip : server_ip'
- function: EVAL
output_fields: [ external_ip ]
parameters:
value_expression: 'direction=Outbound? server_ip : client_ip'
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
filter:
parameters:
data_center_id_num: 1
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_traffic_sketch_metric # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -7,7 +7,7 @@ sources:
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -71,12 +71,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
@@ -123,7 +117,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.transaction_record_local
batch.size: 100000
batch.interval: 30s

View File

@@ -7,7 +7,7 @@ sources:
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -71,12 +71,6 @@ processing_pipelines:
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
@@ -123,7 +117,7 @@ sinks:
clickhouse_sink:
type: clickhouse
properties:
host: 192.168.44.13:9001,192.168.44.14:9001,192.168.44.15:9001,192.168.44.16:9001
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.voip_record_local
batch.size: 100000
batch.interval: 30s

View File

@@ -3,7 +3,7 @@ sources:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -19,7 +19,7 @@ sinks:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } }
kafka.bootstrap.servers: { { kafka_sink_servers } }
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -32,6 +32,16 @@ sinks:
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: raw
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_servers }}
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env:
name: {{ job_name }}
@@ -39,9 +49,6 @@ application:
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [kafka_sink]
- name: kafka_sink
downstream: []
{{ topology }}

View File

@@ -2,8 +2,8 @@ sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
topic: DATAPATH-TELEMETRY-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -34,6 +34,25 @@ processing_pipelines:
value_field: packet
sinks:
kafka_sink:
type: kafka
properties:
topic: DATAPATH-TELEMETRY-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: raw
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
@@ -53,11 +72,6 @@ application:
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink
downstream: []
{{ topology }}

View File

@@ -0,0 +1,49 @@
sources:
kafka_source:
type: kafka
properties:
topic: DOS-EVENT
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: dos_event_kafka_to_clickhouse-20231221
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.dos_event_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env:
name: dos_event_kafka_to_clickhouse
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -2,8 +2,8 @@ sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
topic: PROXY-EVENT
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -107,8 +107,8 @@ sinks:
kafka_sink:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } }
topic: PROXY-EVENT
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -142,11 +142,5 @@ application:
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink
downstream: []
{{ topology }}

View File

@@ -2,8 +2,8 @@ sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
topic: SESSION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -107,8 +107,8 @@ sinks:
kafka_sink:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } }
topic: SESSION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -141,12 +141,4 @@ application:
shade.identifier: aes
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink
downstream: []
{{ topology }}

View File

@@ -0,0 +1,106 @@
sources:
kafka_source:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_traffic_sketch_metric
kafka.auto.offset.reset: latest
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: FLATTEN
lookup_fields: [ fields,tags ]
output_fields: [ ]
parameters:
#prefix: ""
depth: 3
# delimiter: "."
- function: RENAME
lookup_fields: [ '' ]
output_fields: [ '' ]
filter:
parameters:
# parent_fields: [tags]
#rename_fields:
# tags: tags
rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'direction=Outbound? client_ip : server_ip'
- function: EVAL
output_fields: [ external_ip ]
parameters:
value_expression: 'direction=Outbound? server_ip : client_ip'
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
filter:
parameters:
data_center_id_num: 1
sinks:
kafka_sink:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_servers }}
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_traffic_sketch_metric # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
{{ topology }}

View File

@@ -2,8 +2,8 @@ sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -107,8 +107,8 @@ sinks:
kafka_sink:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } }
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -141,12 +141,4 @@ application:
shade.identifier: aes
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink
downstream: []
{{ topology }}

View File

@@ -2,8 +2,8 @@ sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_bootstrap_servers }}
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -107,8 +107,8 @@ sinks:
kafka_sink:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_bootstrap_servers } }
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -141,12 +141,4 @@ application:
shade.identifier: aes
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink
downstream: []
{{ topology }}

View File

@@ -1,16 +0,0 @@
com.geedgenetworks.core.udf.AsnLookup
com.geedgenetworks.core.udf.CurrentUnixTimestamp
com.geedgenetworks.core.udf.DecodeBase64
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.Drop
com.geedgenetworks.core.udf.EncodeBase64
com.geedgenetworks.core.udf.Eval
com.geedgenetworks.core.udf.FromUnixTimestamp
com.geedgenetworks.core.udf.GenerateStringArray
com.geedgenetworks.core.udf.GeoIpLookup
com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.PathCombine
com.geedgenetworks.core.udf.Rename
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.StringJoiner
com.geedgenetworks.core.udf.UnixTimestampConverter

View File

@@ -3208,9 +3208,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record_local on clus
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String
@@ -3229,9 +3229,9 @@ CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.datapath_telemetry_record on cluster ck
device_group String,
traffic_link_id Int32,
source_ip String,
source_port Int32,
source_port Nullable(Int32),
destination_ip String,
destination_port Int32,
destination_port Nullable(Int32),
packet String,
packet_length Int32,
measurements String
@@ -3240,4 +3240,101 @@ ENGINE = Distributed('ck_cluster',
'tsg_galaxy_v3',
'datapath_telemetry_record_local',
rand());
CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric_local on cluster ck_cluster
(
log_id UInt64,
recv_time Int64,
vsys_id Int64,
device_id String,
device_group String,
data_center String,
direction String,
ip_protocol String,
client_ip String,
server_ip String,
internal_ip String,
external_ip String,
client_country String,
server_country String,
client_asn Nullable(Int64),
server_asn Nullable(Int64),
server_fqdn String,
server_domain String,
app String,
app_category String,
c2s_ttl Nullable(Int32),
s2c_ttl Nullable(Int32),
c2s_link_id Nullable(Int32),
s2c_link_id Nullable(Int32),
sessions Int64,
bytes Int64,
sent_bytes Int64,
received_bytes Int64,
pkts Int64,
sent_pkts Int64,
received_pkts Int64,
asymmetric_c2s_flows Int64,
asymmetric_s2c_flows Int64,
c2s_fragments Int64,
s2c_fragments Int64,
c2s_tcp_lost_bytes Int64,
s2c_tcp_lost_bytes Int64,
c2s_tcp_retransmitted_pkts Int64,
s2c_tcp_retransmitted_pkts Int64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(toDate(recv_time))
ORDER BY (vsys_id,
direction,
ip_protocol,
app,
client_ip,
recv_time);
CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric on cluster ck_cluster
(
log_id UInt64,
recv_time Int64,
vsys_id Int64,
device_id String,
device_group String,
data_center String,
direction String,
ip_protocol String,
client_ip String,
server_ip String,
internal_ip String,
external_ip String,
client_country String,
server_country String,
client_asn Nullable(Int64),
server_asn Nullable(Int64),
server_fqdn String,
server_domain String,
app String,
app_category String,
c2s_ttl Nullable(Int32),
s2c_ttl Nullable(Int32),
c2s_link_id Nullable(Int32),
s2c_link_id Nullable(Int32),
sessions Int64,
bytes Int64,
sent_bytes Int64,
received_bytes Int64,
pkts Int64,
sent_pkts Int64,
received_pkts Int64,
asymmetric_c2s_flows Int64,
asymmetric_s2c_flows Int64,
c2s_fragments Int64,
s2c_fragments Int64,
c2s_tcp_lost_bytes Int64,
s2c_tcp_lost_bytes Int64,
c2s_tcp_retransmitted_pkts Int64,
s2c_tcp_retransmitted_pkts Int64
)
ENGINE = Distributed('ck_cluster',
'tsg_galaxy_v3',
'traffic_sketch_metric_local',
rand());

View File

@@ -16,6 +16,7 @@ SELECT recv_time, log_id, decoded_as, session_id, start_timestamp_ms, end_timest
FROM tsg_galaxy_v3.voip_record where recv_time >= toUnixTimestamp('2030-01-01 00:00:00') AND recv_time <toUnixTimestamp('2030-01-01 00:00:01');
SELECT log_id, recv_time, vsys_id, timestamp_us, job_id, sled_ip, device_group, traffic_link_id, source_ip, source_port, destination_ip, destination_port, packet, packet_length, measurements
FROM tsg_galaxy_v3.datapath_telemetry_record where recv_time >= toUnixTimestamp('2030-01-01 00:00:00') AND recv_time <toUnixTimestamp('2030-01-01 00:00:01');
SELECT log_id, recv_time, vsys_id, device_id, device_group, data_center, direction, ip_protocol, client_ip, server_ip, internal_ip, external_ip, client_country, server_country, client_asn, server_asn, server_fqdn, server_domain, app, app_category, c2s_ttl, s2c_ttl, c2s_link_id, s2c_link_id, sessions, bytes, sent_bytes, received_bytes, pkts, sent_pkts, received_pkts, asymmetric_c2s_flows, asymmetric_s2c_flows, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts
FROM tsg_galaxy_v3.traffic_sketch_metric where recv_time >= toUnixTimestamp('2030-01-01 00:00:00') AND recv_time <toUnixTimestamp('2030-01-01 00:00:01');

View File

@@ -993,3 +993,102 @@ SELECT
FROM tsg_galaxy_v3.session_record_local
WHERE empty(monitor_rule_list) = 0
;
CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric_local on cluster ck_cluster
(
log_id UInt64,
recv_time Int64,
vsys_id Int64,
device_id String,
device_group String,
data_center String,
direction String,
ip_protocol String,
client_ip String,
server_ip String,
internal_ip String,
external_ip String,
client_country String,
server_country String,
client_asn Nullable(Int64),
server_asn Nullable(Int64),
server_fqdn String,
server_domain String,
app String,
app_category String,
c2s_ttl Nullable(Int32),
s2c_ttl Nullable(Int32),
c2s_link_id Nullable(Int32),
s2c_link_id Nullable(Int32),
sessions Int64,
bytes Int64,
sent_bytes Int64,
received_bytes Int64,
pkts Int64,
sent_pkts Int64,
received_pkts Int64,
asymmetric_c2s_flows Int64,
asymmetric_s2c_flows Int64,
c2s_fragments Int64,
s2c_fragments Int64,
c2s_tcp_lost_bytes Int64,
s2c_tcp_lost_bytes Int64,
c2s_tcp_retransmitted_pkts Int64,
s2c_tcp_retransmitted_pkts Int64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(toDate(recv_time))
ORDER BY (vsys_id,
direction,
ip_protocol,
app,
client_ip,
recv_time);
CREATE TABLE IF NOT EXISTS tsg_galaxy_v3.traffic_sketch_metric on cluster ck_cluster
(
log_id UInt64,
recv_time Int64,
vsys_id Int64,
device_id String,
device_group String,
data_center String,
direction String,
ip_protocol String,
client_ip String,
server_ip String,
internal_ip String,
external_ip String,
client_country String,
server_country String,
client_asn Nullable(Int64),
server_asn Nullable(Int64),
server_fqdn String,
server_domain String,
app String,
app_category String,
c2s_ttl Nullable(Int32),
s2c_ttl Nullable(Int32),
c2s_link_id Nullable(Int32),
s2c_link_id Nullable(Int32),
sessions Int64,
bytes Int64,
sent_bytes Int64,
received_bytes Int64,
pkts Int64,
sent_pkts Int64,
received_pkts Int64,
asymmetric_c2s_flows Int64,
asymmetric_s2c_flows Int64,
c2s_fragments Int64,
s2c_fragments Int64,
c2s_tcp_lost_bytes Int64,
s2c_tcp_lost_bytes Int64,
c2s_tcp_retransmitted_pkts Int64,
s2c_tcp_retransmitted_pkts Int64
)
ENGINE = Distributed('ck_cluster',
'tsg_galaxy_v3',
'traffic_sketch_metric_local',
rand());
create table IF not EXISTS system.distributed_ddl_queue_cluster ON CLUSTER ck_cluster as system.distributed_ddl_queue ENGINE =Distributed(ck_cluster,`system`,distributed_ddl_queue,rand());

View File

@@ -0,0 +1,152 @@
sources:
kafka_source:
type: kafka
properties:
topic: PROXY-EVENT
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_proxy_event_kafka_to_ndc_kafka
kafka.auto.offset.reset: latest
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: ASN_LOOKUP
lookup_fields: [server_ip]
output_fields: [server_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: ASN_LOOKUP
lookup_fields: [client_ip]
output_fields: [client_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
kafka_sink:
type : kafka
properties:
topic: PROXY-EVENT-PROCESSED
kafka.bootstrap.servers: "{{ kafka_sink_servers }}"
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
application:
env: # [object] Environment Variables
name: etl_proxy_event_kafka_to_ndc_kafka # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [kafka_sink]
- name: kafka_sink

View File

@@ -0,0 +1,151 @@
sources:
kafka_source:
type: kafka
properties:
topic: SESSION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_session_record_kafka_to_ndc_kafka
kafka.auto.offset.reset: latest
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: ASN_LOOKUP
lookup_fields: [server_ip]
output_fields: [server_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: ASN_LOOKUP
lookup_fields: [client_ip]
output_fields: [client_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
kafka_sink:
type : kafka
properties:
topic: SESSION-RECORD-PROCESSED
kafka.bootstrap.servers: "{{ kafka_sink_servers }}"
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
application:
env: # [object] Environment Variables
name: etl_session_record_kafka_to_ndc_kafka # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [kafka_sink]
- name: kafka_sink

View File

@@ -0,0 +1,155 @@
sources:
kafka_source:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
# watermark_timestamp: common_recv_time # [string] Watermark Field Name
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_transaction_record_kafka_to_ndc_kafka
kafka.auto.offset.reset: latest
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: ASN_LOOKUP
lookup_fields: [server_ip]
output_fields: [server_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: ASN_LOOKUP
lookup_fields: [client_ip]
output_fields: [client_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
kafka_sink:
type : kafka
properties:
topic: TRANSACTION-RECORD-PROCESSED
kafka.bootstrap.servers: "{{ kafka_sink_servers }}"
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
application:
env: # [object] Environment Variables
name: etl_transaction_record_kafka_to_ndc_kafka # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [kafka_sink]
- name: kafka_sink

View File

@@ -0,0 +1,118 @@
sources:
kafka_source:
type: kafka
properties:
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_voip_record_kafka_to_clickhouse
kafka.auto.offset.reset: latest
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: ASN_LOOKUP
lookup_fields: [server_ip]
output_fields: [server_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: ASN_LOOKUP
lookup_fields: [client_ip]
output_fields: [client_asn]
parameters:
option: IP_TO_ASN
kb_name: tsg_ip_asn
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.voip_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env: # [object] Environment Variables
name: etl_voip_record_kafka_to_clickhouse # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,142 @@
sources:
kafka_source:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
# watermark_timestamp: common_recv_time # [string] Watermark Field Name
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: PROXY-EVENT
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: etl_proxy_event_kafka_to_clickhouse-20231221
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.proxy_event_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_proxy_event_kafka_to_clickhouse # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,142 @@
sources:
kafka_source:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
# watermark_timestamp: common_recv_time # [string] Watermark Field Name
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: SESSION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: etl_session_record_kafka_to_clickhouse-20230125
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.session_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_session_record_kafka_to_clickhouse # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,92 @@
sources:
kafka_source:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_traffic_sketch_metric
kafka.auto.offset.reset: latest
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: FLATTEN
lookup_fields: [ fields,tags ]
output_fields: [ ]
parameters:
#prefix: ""
depth: 3
# delimiter: "."
- function: RENAME
lookup_fields: [ '' ]
output_fields: [ '' ]
filter:
parameters:
# parent_fields: [tags]
#rename_fields:
# tags: tags
rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'direction=Outbound? client_ip : server_ip'
- function: EVAL
output_fields: [ external_ip ]
parameters:
value_expression: 'direction=Outbound? server_ip : client_ip'
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
filter:
parameters:
data_center_id_num: 1
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_traffic_sketch_metric # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,140 @@
sources:
kafka_source:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
# watermark_timestamp: common_recv_time # [string] Watermark Field Name
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: etl_transaction_record_kafka_to_clickhouse-20240308
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.transaction_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_transaction_record_kafka_to_clickhouse # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,142 @@
sources:
kafka_source:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
# watermark_timestamp: common_recv_time # [string] Watermark Field Name
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties:
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: etl_voip_record_kafka_to_clickhouse-20231221
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
properties:
key: value
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: 1
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [processing_time]
parameters:
precision: seconds
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.voip_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_voip_record_kafka_to_clickhouse # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [etl_processor]
- name: etl_processor
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,54 @@
sources:
kafka_source:
type: kafka
properties:
topic: {{ kafka_source_topic }}
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: raw
sinks:
kafka_sink:
type: kafka
properties:
topic: {{ kafka_sink_topic }}
kafka.bootstrap.servers: { { kafka_sink_servers } }
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: raw
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_servers }}
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
topology:
{{ topology }}

View File

@@ -0,0 +1,77 @@
sources:
kafka_source:
type: kafka
properties:
topic: DATAPATH-TELEMETRY-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: msgpack
processing_pipelines:
etl_processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
parameters:
data_center_id_num: {{ data_center_id_num }}
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ __timestamp ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: BASE64_ENCODE_TO_STRING
output_fields: [ packet ]
parameters:
value_field: packet
sinks:
kafka_sink:
type: kafka
properties:
topic: DATAPATH-TELEMETRY-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: raw
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_sink_host }}
table: tsg_galaxy_v3.datapath_telemetry_record_local
batch.size: 5000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
topology:
{{ topology }}

View File

@@ -0,0 +1,49 @@
sources:
kafka_source:
type: kafka
properties:
topic: DOS-EVENT
kafka.bootstrap.servers: "{{ kafka_source_servers }}"
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.ssl.keystore.location:
kafka.ssl.keystore.password:
kafka.ssl.truststore.location:
kafka.ssl.truststore.password:
kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.buffer.memory:
kafka.group.id: dos_event_kafka_to_clickhouse-20231221
kafka.auto.offset.reset: latest
kafka.max.request.size:
kafka.compression.type: none
format: json
sinks:
clickhouse_sink:
type: clickhouse
properties:
host: "{{ clickhouse_servers }}"
table: tsg_galaxy_v3.dos_event_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env:
name: dos_event_kafka_to_clickhouse
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
topology:
- name: kafka_source
downstream: [clickhouse_sink]
- name: clickhouse_sink

View File

@@ -0,0 +1,146 @@
sources:
kafka_source:
type: kafka
properties:
topic: PROXY-EVENT
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: json
json.ignore.parse.errors: false
processing_pipelines:
etl_processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: {{ data_center_id_num }}
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
sinks:
kafka_sink:
type: kafka
properties:
topic: PROXY-EVENT
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_sink_host }}
table: tsg_galaxy_v3.proxy_event_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
topology:
{{ topology }}

View File

@@ -0,0 +1,144 @@
sources:
kafka_source:
type: kafka
properties:
topic: SESSION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: json
json.ignore.parse.errors: false
processing_pipelines:
etl_processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: {{ data_center_id_num }}
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
sinks:
kafka_sink:
type: kafka
properties:
topic: SESSION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_sink_host }}
table: tsg_galaxy_v3.session_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
{{ topology }}

View File

@@ -0,0 +1,106 @@
sources:
kafka_source:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_traffic_sketch_metric
kafka.auto.offset.reset: latest
kafka.compression.type: none
format: json
processing_pipelines:
etl_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: FLATTEN
lookup_fields: [ fields,tags ]
output_fields: [ ]
parameters:
#prefix: ""
depth: 3
# delimiter: "."
- function: RENAME
lookup_fields: [ '' ]
output_fields: [ '' ]
filter:
parameters:
# parent_fields: [tags]
#rename_fields:
# tags: tags
rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key;
- function: EVAL
output_fields: [ internal_ip ]
parameters:
value_expression: 'direction=Outbound? client_ip : server_ip'
- function: EVAL
output_fields: [ external_ip ]
parameters:
value_expression: 'direction=Outbound? server_ip : client_ip'
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ recv_time ]
parameters:
precision: seconds
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ log_id ]
filter:
parameters:
data_center_id_num: 1
sinks:
kafka_sink:
type: kafka
properties:
topic: TRAFFIC-SKETCH-METRIC
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_servers }}
table: tsg_galaxy_v3.traffic_sketch_metric_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
application:
env: # [object] Environment Variables
name: etl_traffic_sketch_metric # [string] Job Name
shade.identifier: aes
pipeline:
object-reuse: true # [boolean] Object Reuse, default is false
{{ topology }}

View File

@@ -0,0 +1,144 @@
sources:
kafka_source:
type: kafka
properties:
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: json
json.ignore.parse.errors: false
processing_pipelines:
etl_processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: {{ data_center_id_num }}
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
sinks:
kafka_sink:
type: kafka
properties:
topic: TRANSACTION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_sink_host }}
table: tsg_galaxy_v3.transaction_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
{{ topology }}

View File

@@ -0,0 +1,144 @@
sources:
kafka_source:
type: kafka
properties:
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: {{ kafka_source_group_id }}
kafka.auto.offset.reset: latest
format: json
json.ignore.parse.errors: false
processing_pipelines:
etl_processor:
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
functions:
- function: SNOWFLAKE_ID
lookup_fields: ['']
output_fields: [log_id]
parameters:
data_center_id_num: {{ data_center_id_num }}
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [data_center]
filter:
parameters:
value_expression: $.tags[?(@.tag=='data_center')][0].value
- function: JSON_EXTRACT
lookup_fields: [device_tag]
output_fields: [device_group]
filter:
parameters:
value_expression: $.tags[?(@.tag=='device_group')][0].value
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [__timestamp]
output_fields: [recv_time]
parameters:
precision: seconds
- function: EVAL
output_fields: [ingestion_time]
parameters:
value_expression: recv_time
- function: DOMAIN
lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni]
output_fields: [server_domain]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_subject]
parameters:
value_field: mail_subject
charset_field: mail_subject_charset
- function: BASE64_DECODE_TO_STRING
output_fields: [mail_attachment_name]
parameters:
value_field: mail_attachment_name
charset_field: mail_attachment_name_charset
- function: PATH_COMBINE
lookup_fields: [rtp_pcap_path]
output_fields: [rtp_pcap_path]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path]
- function: PATH_COMBINE
lookup_fields: [http_request_body]
output_fields: [http_request_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body]
- function: PATH_COMBINE
lookup_fields: [http_response_body]
output_fields: [http_response_body]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body]
- function: PATH_COMBINE
lookup_fields: [mail_eml_file]
output_fields: [mail_eml_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file]
- function: PATH_COMBINE
lookup_fields: [packet_capture_file]
output_fields: [packet_capture_file]
parameters:
path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file]
- function: CURRENT_UNIX_TIMESTAMP
output_fields: [ processing_time ]
parameters:
precision: seconds
sinks:
kafka_sink:
type: kafka
properties:
topic: VOIP-CONVERSATION-RECORD
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
json.ignore.parse.errors: false
log.failures.only: true
clickhouse_sink:
type: clickhouse
properties:
host: {{ clickhouse_sink_host }}
table: tsg_galaxy_v3.voip_record_local
batch.size: 100000
batch.interval: 30s
connection.user: e54c9568586180eede1506eecf3574e9
connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e
connection.connect_timeout: 30
connection.query_timeout: 300
application:
env:
name: {{ job_name }}
shade.identifier: aes
pipeline:
object-reuse: true
{{ topology }}