diff --git a/cyber_narrator/installation/groot_stream/README.md b/cyber_narrator/installation/groot_stream/README.md index 395ba4f..f195e3f 100644 --- a/cyber_narrator/installation/groot_stream/README.md +++ b/cyber_narrator/installation/groot_stream/README.md @@ -1,2 +1,4 @@ -v1.2.4 (2024-04-08) -https://git.mesalab.cn/galaxy/platform/groot-stream/-/releases/v1.2.4 + +groot-stream version > 1.4.0 + +etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file diff --git a/cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka b/cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka similarity index 89% rename from cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka rename to cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka index 6e90df7..82b4855 100644 --- a/cyber_narrator/installation/groot_stream/etl_session_record_processed_kafka_to_cn_kafka +++ b/cyber_narrator/installation/groot_stream/etl_session_record_kafka_to_cn_kafka @@ -3,7 +3,7 @@ sources: type: kafka # fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: # [object] Source Properties - topic: SESSION-RECORD-PROCESSED + topic: {{ tsg_olap_kafka_session_record_or_session_record_processed_topic }} # SESSION-RECORD/SESSION-RECORD-PROCESSED kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }} kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 @@ -11,7 +11,7 @@ sources: kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 - kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka + kafka.group.id: etl_session_record_kafka_to_cn_kafka kafka.auto.offset.reset: latest format: json @@ -21,6 +21,28 @@ processing_pipelines: remove_fields: output_fields: functions: # [array of object] Function List + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ cn_log_id ] + parameters: + data_center_id_num: 1 + + - function: EVAL + output_fields: [ log_id ] + parameters: + value_expression: "log_id == null ? cn_log_id : log_id" + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ kafka_recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ recv_time ] + parameters: + value_expression: "recv_time == null ? kafka_recv_time : recv_time" + - function: EVAL output_fields: [ domain ] parameters: @@ -261,19 +283,19 @@ processing_pipelines: parameters: value_expression: "client_zone == 'external' ? sessions : external_query_num" - - function: CN_VPN_LOOKUP + - function: CN_ANONYMITY_LOOKUP lookup_fields: [ server_ip ] - output_fields: [ server_vpn_service_name ] + output_fields: [ server_node_type ] parameters: - kb_name: cn_vpn_learning_ip - option: IP_TO_VPN + kb_name: cn_ioc_darkweb + option: IP_TO_NODE_TYPE - - function: CN_VPN_LOOKUP + - function: CN_ANONYMITY_LOOKUP lookup_fields: [ domain ] - output_fields: [ domain_vpn_service_name ] + output_fields: [ domain_node_type ] parameters: - kb_name: cn_vpn_learning_domain - option: DOMAIN_TO_VPN + kb_name: cn_ioc_darkweb + option: DOMAIN_TO_NODE_TYPE - function: CN_IOC_LOOKUP lookup_fields: [ server_ip ] @@ -289,69 +311,39 @@ processing_pipelines: kb_name: cn_ioc_malware option: DOMAIN_TO_MALWARE - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ client_ip ] output_fields: [ client_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ server_ip ] output_fields: [ server_ip_tags ] parameters: - kb_name: cn_ip_tag_user_define + kb_name: cn_intelligence_indicator option: IP_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP lookup_fields: [ domain ] output_fields: [ domain_tags ] parameters: - kb_name: cn_domain_tag_user_define + kb_name: cn_intelligence_indicator option: DOMAIN_TO_TAG - - function: CN_USER_DEFINE_TAG_LOOKUP - lookup_fields: [ app ] - output_fields: [ app_tags ] - parameters: - kb_name: cn_app_tag_user_define - option: APP_TO_TAG - - function: GENERATE_STRING_ARRAY lookup_fields: [ client_idc_renter,client_ip_tags ] output_fields: [ client_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ] output_fields: [ server_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] + lookup_fields: [ domain_node_type,domain_malware,domain_tags ] output_fields: [ domain_tags ] - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ client_ip_tags ] - output_fields: [ client_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ server_ip_tags ] - output_fields: [ server_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ domain_tags ] - output_fields: [ domain_tags ] - parameters: - prefix: domain. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ app_tags ] - output_fields: [ app_tags ] - parameters: - prefix: app. postprocessing_pipelines: remove_field_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl @@ -377,7 +369,7 @@ sinks: application: env: - name: etl_session_record_processed_kafka_to_cn_kafka + name: etl_session_record_kafka_to_cn_kafka shade.identifier: aes pipeline: object-reuse: true diff --git a/cyber_narrator/installation/groot_stream/grootstream.yaml b/cyber_narrator/installation/groot_stream/grootstream.yaml index 7e75185..e65976a 100644 --- a/cyber_narrator/installation/groot_stream/grootstream.yaml +++ b/cyber_narrator/installation/groot_stream/grootstream.yaml @@ -60,18 +60,6 @@ grootstream: files: - 12 - - name: cn_vpn_learning_ip - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base - files: - - 15 - - - name: cn_vpn_learning_domain - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base - files: - - 14 - - name: cn_ioc_darkweb fs_type: http fs_path: http://192.168.44.55:9999/v1/knowledge_base @@ -84,17 +72,11 @@ grootstream: files: - 7 - - name: cn_ip_tag_user_define + - name: cn_intelligence_indicator fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_ip_tag_user_defined - - - name: cn_domain_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_domain_tag_user_defined - - - name: cn_app_tag_user_define - fs_type: http - fs_path: http://192.168.44.55:9999/v1/knowledge_base?category=cn_app_tag_user_defined + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 16 - name: cn_rule fs_type: http diff --git a/cyber_narrator/installation/groot_stream/udf.plugins b/cyber_narrator/installation/groot_stream/udf.plugins index 8ecce84..eb9fd57 100644 --- a/cyber_narrator/installation/groot_stream/udf.plugins +++ b/cyber_narrator/installation/groot_stream/udf.plugins @@ -14,6 +14,8 @@ com.geedgenetworks.core.udf.cn.IpZoneLookup com.geedgenetworks.core.udf.cn.VpnLookup com.geedgenetworks.core.udf.cn.AnonymityLookup com.geedgenetworks.core.udf.cn.IocLookup -com.geedgenetworks.core.udf.cn.UserDefineTagLookup com.geedgenetworks.core.udf.cn.FieldsMerge com.geedgenetworks.core.udf.cn.ArrayElementsPrepend +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.UnixTimestampConverter diff --git a/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties new file mode 100644 index 0000000..133582c --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/business.properties @@ -0,0 +1,12 @@ +# session-record-cn +#cn.record.etl.class=com.zdjizhi.etl.CnRecordPersistence +# pre-metrics +cn.pre.metric.class=com.zdjizhi.pre.base.CnPreMetric +# relation +cn.pre.relation.metric.class=com.zdjizhi.pre.relation.CnRelationMetric +# dns-metrics +cn.dns.pre.metric.class=com.zdjizhi.pre.dns.DnsPreMetric +# detection +cn.detection.indicator.class=com.zdjizhi.schedule.indicator.IndicatorSchedule +# location +cn.location.metric.class=com.zdjizhi.pre.location.LocationMetric \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties new file mode 100644 index 0000000..3d9a819 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.04/cn_stream/common.properties @@ -0,0 +1,41 @@ +# job name +stream.execution.job.name=cn_stream +# default parallelism +stream.execution.environment.parallelism={{ flink.cn_stream.parallelism }} +# kafka source parallelism +session.record.completed.parallelism={{ flink.cn_stream.parallelism }} +# session-record-cn sink parallelism +cn.record.parallelism={{ flink.cn_stream.parallelism }} +# pre-metrics sink parallelism +metric.output.parallelism={{ flink.cn_stream.parallelism }} +# dns-metrics sink parallelism +dns.metric.output.parallelism={{ flink.cn_stream.parallelism }} +# relation sink parallelism +metric.entity.relation.output.parallelism={{ flink.cn_stream.parallelism }} +# dynamic attribute sink parallelism +metric.dynamic.attribute.output.parallelism={{ flink.cn_stream.parallelism }} +# subscriber-app relation sink parallelism +metric.subscriber.app.relation.output.parallelism={{ flink.cn_stream.parallelism }} +# location sink parallelism +location.metric.output.parallelism={{ flink.cn_stream.parallelism }} +# kafka consumer +kafka.input.bootstrap.servers={{ kafka_source_servers }} +session.record.completed.topic=SESSION-RECORD-CN +session.record.completed.group.id=session-record-cn-stream +# kafka consumer sasl 0:off 1:on +input.sasl.jaas.config.flag=1 +# clickhouse +clickhouse.address={{ clickhouse_servers }} +clickhouse.user=LXDp+zqdQqDIIqaDfqsKoA== +clickhouse.password=RY+0nruXpPqITsQ3ob4P7Qbd8W246+Pa +clickhouse.config.connect_timeout=30 +clickhouse.config.query_timeout=300 +# flink checkpoint 0:off 1:on +flink.enable.checkpoint.flag=0 +# api detection url +rule.full.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection +rule.inc.url=http://{{ vrrp_instance.cn_ui.virtual_ipaddress }}:8090/v1/rule/detection/increase +# gateway host +gateway.host={{ vrrp_instance.default.virtual_ipaddress }} +# warkmark +watermark.seconds=1 \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md index c2bba95..f195e3f 100644 --- a/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md +++ b/cyber_narrator/upgrade/2024/CN-24.04/groot_stream/README.md @@ -1,4 +1,4 @@ -groot-stream version > 1.3.0 +groot-stream version > 1.4.0 etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file