This commit is contained in:
wangkuan
2024-06-14 18:29:11 +08:00
7 changed files with 106 additions and 75 deletions

View File

@@ -1,2 +1,4 @@
v1.2.4 (2024-04-08)
https://git.mesalab.cn/galaxy/platform/groot-stream/-/releases/v1.2.4
groot-stream version > 1.4.0
etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED

View File

@@ -3,7 +3,7 @@ sources:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
topic: SESSION-RECORD-PROCESSED
topic: {{ tsg_olap_kafka_session_record_or_session_record_processed_topic }} # SESSION-RECORD/SESSION-RECORD-PROCESSED
kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
@@ -11,7 +11,7 @@ sources:
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka
kafka.group.id: etl_session_record_kafka_to_cn_kafka
kafka.auto.offset.reset: latest
format: json
@@ -21,6 +21,28 @@ processing_pipelines:
remove_fields:
output_fields:
functions: # [array of object] Function List
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
output_fields: [ cn_log_id ]
parameters:
data_center_id_num: 1
- function: EVAL
output_fields: [ log_id ]
parameters:
value_expression: "log_id == null ? cn_log_id : log_id"
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ __timestamp ]
output_fields: [ kafka_recv_time ]
parameters:
precision: seconds
- function: EVAL
output_fields: [ recv_time ]
parameters:
value_expression: "recv_time == null ? kafka_recv_time : recv_time"
- function: EVAL
output_fields: [ domain ]
parameters:
@@ -261,19 +283,19 @@ processing_pipelines:
parameters:
value_expression: "client_zone == 'external' ? sessions : external_query_num"
- function: CN_VPN_LOOKUP
- function: CN_ANONYMITY_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_vpn_service_name ]
output_fields: [ server_node_type ]
parameters:
kb_name: cn_vpn_learning_ip
option: IP_TO_VPN
kb_name: cn_ioc_darkweb
option: IP_TO_NODE_TYPE
- function: CN_VPN_LOOKUP
- function: CN_ANONYMITY_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_vpn_service_name ]
output_fields: [ domain_node_type ]
parameters:
kb_name: cn_vpn_learning_domain
option: DOMAIN_TO_VPN
kb_name: cn_ioc_darkweb
option: DOMAIN_TO_NODE_TYPE
- function: CN_IOC_LOOKUP
lookup_fields: [ server_ip ]
@@ -289,69 +311,39 @@ processing_pipelines:
kb_name: cn_ioc_malware
option: DOMAIN_TO_MALWARE
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ client_ip ]
output_fields: [ client_ip_tags ]
parameters:
kb_name: cn_ip_tag_user_define
kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ server_ip ]
output_fields: [ server_ip_tags ]
parameters:
kb_name: cn_ip_tag_user_define
kb_name: cn_intelligence_indicator
option: IP_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
- function: CN_INTELLIGENCE_INDICATOR_LOOKUP
lookup_fields: [ domain ]
output_fields: [ domain_tags ]
parameters:
kb_name: cn_domain_tag_user_define
kb_name: cn_intelligence_indicator
option: DOMAIN_TO_TAG
- function: CN_USER_DEFINE_TAG_LOOKUP
lookup_fields: [ app ]
output_fields: [ app_tags ]
parameters:
kb_name: cn_app_tag_user_define
option: APP_TO_TAG
- function: GENERATE_STRING_ARRAY
lookup_fields: [ client_idc_renter,client_ip_tags ]
output_fields: [ client_ip_tags ]
- function: GENERATE_STRING_ARRAY
lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ]
lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ]
output_fields: [ server_ip_tags ]
- function: GENERATE_STRING_ARRAY
lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ]
lookup_fields: [ domain_node_type,domain_malware,domain_tags ]
output_fields: [ domain_tags ]
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ client_ip_tags ]
output_fields: [ client_ip_tags ]
parameters:
prefix: ip.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ server_ip_tags ]
output_fields: [ server_ip_tags ]
parameters:
prefix: ip.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ domain_tags ]
output_fields: [ domain_tags ]
parameters:
prefix: domain.
- function: CN_ARRAY_ELEMENTS_PREPEND
lookup_fields: [ app_tags ]
output_fields: [ app_tags ]
parameters:
prefix: app.
postprocessing_pipelines:
remove_field_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
@@ -377,7 +369,7 @@ sinks:
application:
env:
name: etl_session_record_processed_kafka_to_cn_kafka
name: etl_session_record_kafka_to_cn_kafka
shade.identifier: aes
pipeline:
object-reuse: true

View File

@@ -60,18 +60,6 @@ grootstream:
files:
- 12
- name: cn_vpn_learning_ip
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 15
- name: cn_vpn_learning_domain
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 14
- name: cn_ioc_darkweb
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base
@@ -84,17 +72,11 @@ grootstream:
files:
- 7
- name: cn_ip_tag_user_define
- name: cn_intelligence_indicator
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined
- name: cn_domain_tag_user_define
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined
- name: cn_app_tag_user_define
fs_type: http
fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined
fs_path: http://192.168.44.55:9999/v1/knowledge_base
files:
- 16
- name: cn_rule
fs_type: http

View File

@@ -14,6 +14,8 @@ com.geedgenetworks.core.udf.cn.IpZoneLookup
com.geedgenetworks.core.udf.cn.VpnLookup
com.geedgenetworks.core.udf.cn.AnonymityLookup
com.geedgenetworks.core.udf.cn.IocLookup
com.geedgenetworks.core.udf.cn.UserDefineTagLookup
com.geedgenetworks.core.udf.cn.FieldsMerge
com.geedgenetworks.core.udf.cn.ArrayElementsPrepend
com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup
com.geedgenetworks.core.udf.SnowflakeId
com.geedgenetworks.core.udf.UnixTimestampConverter

View File

@@ -0,0 +1,12 @@
# session-record-cn
#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence
# pre-metrics
cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric
# relation
cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric
# dns-metrics
cn.dns.pre.metric.class=com.zdjizhi.pre.dns.DnsPreMetric
# detection
cn.detection.indicator.class=com.zdjizhi.schedule.indicator.IndicatorSchedule
# location
cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric

View File

@@ -0,0 +1,41 @@
# job name
stream.execution.job.name=cn_stream
# default parallelism
stream.execution.environment.parallelism={{ flink.cn_stream.parallelism }}
# kafka source parallelism
session.record.completed.parallelism={{ flink.cn_stream.parallelism }}
# session-record-cn sink parallelism
cn.record.parallelism={{ flink.cn_stream.parallelism }}
# pre-metrics sink parallelism
metric.output.parallelism={{ flink.cn_stream.parallelism }}
# dns-metrics sink parallelism
dns.metric.output.parallelism={{ flink.cn_stream.parallelism }}
# relation sink parallelism
metric.entity.relation.output.parallelism={{ flink.cn_stream.parallelism }}
# dynamic attribute sink parallelism
metric.dynamic.attribute.output.parallelism={{ flink.cn_stream.parallelism }}
# subscriber-app relation sink parallelism
metric.subscriber.app.relation.output.parallelism={{ flink.cn_stream.parallelism }}
# location sink parallelism
location.metric.output.parallelism={{ flink.cn_stream.parallelism }}
# kafka consumer
kafka.input.bootstrap.servers={{ kafka_source_servers }}
session.record.completed.topic=SESSION-RECORD-CN
session.record.completed.group.id=session-record-cn-stream
# kafka consumer sasl 0:off 1:on
input.sasl.jaas.config.flag=1
# clickhouse
clickhouse.address={{ clickhouse_servers }}
clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA==
clickhouse.password=RY+0nruXpPqITsQ3ob4P7Qbd8W246+Pa
clickhouse.config.connect_timeout=30
clickhouse.config.query_timeout=300
# flink checkpoint 0:off 1:on
flink.enable.checkpoint.flag=0
# api detection url
rule.full.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection
rule.inc.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection/increase
# gateway host
gateway.host={{ vrrp_instance.default.virtual_ipaddress }}
# warkmark
watermark.seconds=1

View File

@@ -1,4 +1,4 @@
groot-stream version > 1.3.0
groot-stream version > 1.4.0
etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED