From 82066256ec7c4b6880bd90f90f75a53d84eb638c Mon Sep 17 00:00:00 2001 From: gujinkai Date: Tue, 29 Oct 2024 10:22:16 +0800 Subject: [PATCH] CN 24.08.1 groot-stream config --- .../CN-24.08/groot-stream/24.08.0/README.md | 4 + .../etl_session_record_kafka_to_cn_kafka | 0 .../{ => 24.08.0}/grootstream.yaml | 0 .../sd_kafka_to_cn_clickhouse.yaml | 0 .../groot-stream/{ => 24.08.0}/udf.plugins | 0 .../CN-24.08/groot-stream/24.08.1/README.md | 4 + .../etl_session_record_kafka_to_cn_kafka | 408 ++++++++++++++++++ .../groot-stream/24.08.1/grootstream.yaml | 94 ++++ .../24.08.1/sd_kafka_to_cn_clickhouse.yaml | 230 ++++++++++ .../CN-24.08/groot-stream/24.08.1/udf.plugins | 27 ++ 10 files changed, 767 insertions(+) create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/README.md rename cyber_narrator/upgrade/2024/CN-24.08/groot-stream/{ => 24.08.0}/etl_session_record_kafka_to_cn_kafka (100%) rename cyber_narrator/upgrade/2024/CN-24.08/groot-stream/{ => 24.08.0}/grootstream.yaml (100%) rename cyber_narrator/upgrade/2024/CN-24.08/groot-stream/{ => 24.08.0}/sd_kafka_to_cn_clickhouse.yaml (100%) rename cyber_narrator/upgrade/2024/CN-24.08/groot-stream/{ => 24.08.0}/udf.plugins (100%) create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/README.md create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/etl_session_record_kafka_to_cn_kafka create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/grootstream.yaml create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/sd_kafka_to_cn_clickhouse.yaml create mode 100644 cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/udf.plugins diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/README.md b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/README.md new file mode 100644 index 0000000..3a164cd --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/README.md @@ -0,0 +1,4 @@ + +groot-stream version > 1.6.0 + +etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/etl_session_record_kafka_to_cn_kafka b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/etl_session_record_kafka_to_cn_kafka similarity index 100% rename from cyber_narrator/upgrade/2024/CN-24.08/groot-stream/etl_session_record_kafka_to_cn_kafka rename to cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/etl_session_record_kafka_to_cn_kafka diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/grootstream.yaml b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/grootstream.yaml similarity index 100% rename from cyber_narrator/upgrade/2024/CN-24.08/groot-stream/grootstream.yaml rename to cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/grootstream.yaml diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/sd_kafka_to_cn_clickhouse.yaml b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/sd_kafka_to_cn_clickhouse.yaml similarity index 100% rename from cyber_narrator/upgrade/2024/CN-24.08/groot-stream/sd_kafka_to_cn_clickhouse.yaml rename to cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/sd_kafka_to_cn_clickhouse.yaml diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/udf.plugins b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/udf.plugins similarity index 100% rename from cyber_narrator/upgrade/2024/CN-24.08/groot-stream/udf.plugins rename to cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.0/udf.plugins diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/README.md b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/README.md new file mode 100644 index 0000000..00ecc66 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/README.md @@ -0,0 +1,4 @@ + +groot-stream version > 1.7.0 + +etl_session_record_kafka_to_cn_kafka 需要根据部署环境确定数据源的topic是SESSION-RECORD还是SESSION-RECORD-PROCESSED \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/etl_session_record_kafka_to_cn_kafka b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/etl_session_record_kafka_to_cn_kafka new file mode 100644 index 0000000..15af5d0 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/etl_session_record_kafka_to_cn_kafka @@ -0,0 +1,408 @@ +sources: + kafka_source: + type: kafka + # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + properties: # [object] Source Properties + topic: {{ tsg_olap_kafka_session_record_or_session_record_processed_topic }} # SESSION-RECORD/SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_session_record_kafka_to_cn_kafka + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: projection + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: RENAME + parameters: + rename_fields: + client_ip_tags: ignore + server_ip_tags: ignore + + - function: SNOWFLAKE_ID + lookup_fields: [ '' ] + output_fields: [ cn_log_id ] + parameters: + data_center_id_num: 1 + + - function: EVAL + output_fields: [ log_id ] + parameters: + value_expression: "is_def(log_id) ? log_id : cn_log_id" + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ kafka_recv_time ] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ recv_time ] + parameters: + value_expression: "is_def(recv_time) ? recv_time : kafka_recv_time" + + - function: DOMAIN + lookup_fields: [ http_host, ssl_sni, dtls_sni, quic_sni ] + output_fields: [ cn_server_domain ] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: EVAL + output_fields: [ server_domain ] + parameters: + value_expression: "is_def(server_domain) ? server_domain : cn_server_domain" + + - function: EVAL + output_fields: [ domain ] + parameters: + value_expression: server_fqdn + + - function: EVAL + output_fields: [ domain_sld ] + parameters: + value_expression: server_domain + + - function: CN_L7_PROTOCOL_AND_APP_EXTRACT + parameters: + decoded_path_field_name: decoded_path + app_transition_field_name: app_transition + l7_protocol_field_name: l7_protocol + app_field_name: app + l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + CITY: client_admin_area + LONGITUDE: client_longitude + LATITUDE: client_latitude + ISP: client_isp + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country_region + PROVINCE: server_super_admin_area + CITY: server_admin_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ in_link_id ] + output_fields: [ in_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ out_link_id ] + output_fields: [ out_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_FQDN_CATEGORY_LOOKUP + lookup_fields: [ domain ] + parameters: + kb_name: cn_fqdn_category + field_mapping: + NAME: domain_category_name + GROUP: domain_category_group + REPUTATION_LEVEL: domain_reputation_level + + - function: CN_ICP_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_icp_company_name ] + parameters: + kb_name: cn_fqdn_icp + + - function: CN_FQDN_WHOIS_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_whois_org ] + parameters: + kb_name: cn_fqdn_whois + + - function: CN_DNS_SERVER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_dns_server ] + parameters: + kb_name: cn_dns_server + + - function: CN_APP_CATEGORY_LOOKUP + lookup_fields: [ app ] + parameters: + kb_name: cn_app_category + field_mapping: + CATEGORY: app_category + SUBCATEGORY: app_subcategory + COMPANY: app_company + COMPANY_CATEGORY: app_company_category + + - function: EVAL + output_fields: [ client_zone ] + parameters: + value_expression: "(flags & 8) == 8 ? 'internal' : 'external'" + + - function: EVAL + output_fields: [ server_zone ] + parameters: + value_expression: "(flags & 16) == 16 ? 'internal' : 'external'" + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_zone ] + parameters: + kb_name: none + #kb_name: cn_internal_ip + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_zone ] + parameters: + kb_name: none + #kb_name: cn_internal_ip + + - function: EVAL + output_fields: [ sent_bytes ] + parameters: + value_expression: "is_def(sent_bytes) ? sent_bytes : 0" + + - function: EVAL + output_fields: [ sent_pkts ] + parameters: + value_expression: "is_def(sent_pkts) ? sent_pkts : 0" + + - function: EVAL + output_fields: [ received_bytes ] + parameters: + value_expression: "is_def(received_bytes) ? received_bytes : 0" + + - function: EVAL + output_fields: [ received_pkts ] + parameters: + value_expression: "is_def(received_pkts) ? received_pkts : 0" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_internal_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_bytes + received_bytes : traffic_internal_byte" + + - function: EVAL + output_fields: [ traffic_internal_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_pkts + received_pkts : traffic_internal_pkt" + + - function: EVAL + output_fields: [ traffic_through_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_bytes + received_bytes : traffic_through_byte" + + - function: EVAL + output_fields: [ traffic_through_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_pkts + received_pkts : traffic_through_pkt" + + - function: EVAL + output_fields: [ sessions ] + parameters: + value_expression: "1" + + - function: EVAL + output_fields: [ internal_query_num ] + parameters: + value_expression: "client_zone == 'internal' ? sessions : internal_query_num" + + - function: EVAL + output_fields: [ external_query_num ] + parameters: + value_expression: "client_zone == 'external' ? sessions : external_query_num" + + - function: CN_ANONYMITY_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_node_type ] + parameters: + kb_name: cn_ioc_darkweb + option: IP_TO_NODE_TYPE + + - function: CN_ANONYMITY_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_node_type ] + parameters: + kb_name: cn_ioc_darkweb + option: DOMAIN_TO_NODE_TYPE + + - function: CN_IOC_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_malware ] + parameters: + kb_name: cn_ioc_malware + option: IP_TO_MALWARE + + - function: CN_IOC_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_malware ] + parameters: + kb_name: cn_ioc_malware + option: DOMAIN_TO_MALWARE + + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_ip_tags ] + parameters: + kb_name: cn_intelligence_indicator + option: IP_TO_TAG + + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_ip_tags ] + parameters: + kb_name: cn_intelligence_indicator + option: IP_TO_TAG + + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_tags ] + parameters: + kb_name: cn_intelligence_indicator + option: DOMAIN_TO_TAG + + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_tags ] + parameters: + kb_name: cn_intelligence_indicator + option: SUBSCRIBER_TO_TAG + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ client_idc_renter,client_ip_tags ] + output_fields: [ client_ip_tags ] + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ] + output_fields: [ server_ip_tags ] + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ domain_node_type,domain_malware,domain_tags ] + output_fields: [ domain_tags ] + +postprocessing_pipelines: + remove_field_processor: # [object] Processing Pipeline + type: projection + output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,subscriber_tags,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list,traffic_inbound_byte,traffic_inbound_pkt,traffic_outbound_byte,traffic_outbound_pkt,traffic_internal_byte,traffic_internal_pkt,traffic_through_byte,traffic_through_pkt,internal_query_num,external_query_num ] + +sinks: + cn_kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD-CN + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + +application: + env: + name: etl_session_record_kafka_to_cn_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [ session_record_processor ] + - name: session_record_processor + downstream: [ remove_field_processor ] + - name: remove_field_processor + downstream: [ cn_kafka_sink ] + - name: cn_kafka_sink + downstream: [ ] diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/grootstream.yaml b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/grootstream.yaml new file mode 100644 index 0000000..54acfc5 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/grootstream.yaml @@ -0,0 +1,94 @@ +grootstream: + knowledge_base: + - name: cn_ip_location + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 1 + + - name: cn_ip_asn + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 2 + + - name: cn_idc_renter + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 11 + + - name: cn_link_direction + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 13 + + - name: cn_fqdn_category + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 5 + + - name: cn_fqdn_icp + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 4 + + - name: cn_fqdn_whois + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 6 + + - name: cn_dns_server + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 3 + + - name: cn_app_category + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 9 + + - name: cn_internal_ip + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 12 + + - name: cn_ioc_darkweb + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 8 + + - name: cn_ioc_malware + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 7 + + - name: cn_intelligence_indicator + fs_type: http + fs_path: http://192.168.44.55:9999/v1/knowledge_base + files: + - 16 + + - name: base_station_location + fs_type: local + fs_path: /data/cn/olap/flink/topology/groot-stream/knowledge/ + files: + - base_station_location.csv + + - name: cn_rule + fs_type: http + fs_path: http://192.168.44.54:8090 + properties: + token: 1a653ea0-d39b-4246-94b0-1ba95db4b6a7 + + properties: + scheduler.knowledge_base.update.interval.minutes: 5 \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/sd_kafka_to_cn_clickhouse.yaml b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/sd_kafka_to_cn_clickhouse.yaml new file mode 100644 index 0000000..fb21bb4 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/sd_kafka_to_cn_clickhouse.yaml @@ -0,0 +1,230 @@ +sources: + kafka_source: + type: kafka + watermark_timestamp: ingestion_time + watermark_timestamp_unit: ms + watermark_lag: 1 + properties: # [object] Source Properties + topic: SUBSCRIBER-LOCATION-MAPPING + kafka.bootstrap.servers: {{ tsg_olap_kafka_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: sd_kafka_to_cn_clickhouse + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + session_record_processor: + type: projection + remove_fields: + output_fields: + functions: + - function: EVAL + output_fields: [ imsi ] + parameters: + value_expression: "subscriber_id" + + - function: JSON_EXTRACT + lookup_fields: [ e_cgi ] + output_fields: [ mcc ] + parameters: + value_expression: $.mcc + + - function: EVAL + output_fields: [ imei ] + parameters: + value_expression: "mei" + + - function: JSON_EXTRACT + lookup_fields: [ e_cgi ] + output_fields: [ mnc ] + parameters: + value_expression: $.mnc + + - function: JSON_EXTRACT + lookup_fields: [ e_cgi ] + output_fields: [ eci ] + parameters: + value_expression: $.eci + + - function: EVAL + output_fields: [ cell_id ] + parameters: + value_expression: "mcc + '-' + mnc + '-' + str(long(eci) >> 8) + '-' + str(long(eci) & 0xFF)" + + - function: EVAL + output_fields: [ cell_type ] + parameters: + value_expression: "1" + + - function: BASE_STATION_LOOKUP + lookup_fields: [ cell_id ] + output_fields: [ subscriber_longitude,subscriber_latitude ] + parameters: + kb_name: base_station_location + + - function: H3_CELL_LOOKUP + lookup_fields: [ subscriber_longitude,subscriber_latitude ] + output_fields: [ first_location ] + parameters: + resolution: 9 + + - function: H3_CELL_LOOKUP + lookup_fields: [ subscriber_longitude,subscriber_latitude ] + output_fields: [ second_location ] + parameters: + resolution: 8 + + - function: H3_CELL_LOOKUP + lookup_fields: [ subscriber_longitude,subscriber_latitude ] + output_fields: [ third_location ] + parameters: + resolution: 7 + + - function: EVAL + output_fields: [ data_source ] + parameters: + value_expression: "'SD'" + + - function: CN_INTELLIGENCE_INDICATOR_LOOKUP + lookup_fields: [ subscriber_id ] + output_fields: [ subscriber_tags ] + parameters: + kb_name: cn_intelligence_indicator + option: SUBSCRIBER_TO_TAG + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ subscriber_tags ] + output_fields: [ subscriber_tags ] + parameters: + prefix: subscriber. + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ subscriber_tags ] + output_fields: [ entity_tags ] + + aggregate_processor: + type: aggregate + group_by_fields: [ subscriber_id ] + window_type: sliding_event_time + window_size: 60 + window_slide: 60 + window_timestamp_field: stat_time + functions: + - function: LAST_VALUE + lookup_fields: [ imei ] + output_fields: [ imei ] + + - function: LAST_VALUE + lookup_fields: [ imsi ] + output_fields: [ imsi ] + + - function: LAST_VALUE + lookup_fields: [ msisdn ] + output_fields: [ phone_number ] + + - function: LAST_VALUE + lookup_fields: [ apn ] + output_fields: [ apn ] + + - function: LAST_VALUE + lookup_fields: [ cell_id ] + output_fields: [ cell_id ] + + - function: LAST_VALUE + lookup_fields: [ cell_type ] + output_fields: [ cell_type ] + + - function: LAST_VALUE + lookup_fields: [ subscriber_longitude ] + output_fields: [ subscriber_longitude ] + + - function: LAST_VALUE + lookup_fields: [ subscriber_latitude ] + output_fields: [ subscriber_latitude ] + + - function: LAST_VALUE + lookup_fields: [ first_location ] + output_fields: [ first_location ] + + - function: LAST_VALUE + lookup_fields: [ second_location ] + output_fields: [ second_location ] + + - function: LAST_VALUE + lookup_fields: [ third_location ] + output_fields: [ third_location ] + + - function: LAST_VALUE + lookup_fields: [ data_source ] + output_fields: [ data_source ] + + - function: LAST_VALUE + lookup_fields: [ subscriber_tags ] + output_fields: [ subscriber_tags ] + + time_processor: + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: + - function: EVAL + output_fields: [ stat_time ] + parameters: + value_expression: "stat_time / 1000" + +postprocessing_pipelines: + remove_field_processor: + type: projection + output_fields: [ subscriber_id,stat_time,imsi,imei,phone_number,apn,cell_id,cell_type,subscriber_longitude,subscriber_latitude,first_location,second_location,third_location,data_source,subscriber_tags ] + +sinks: + cn_location_clickhouse_sink: + type: clickhouse + properties: + host: {{ cn_olap_clickhouse_servers }} + table: cyber_narrator_galaxy.location_subscriber_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + + cn_relation_clickhouse_sink: + type: clickhouse + properties: + host: {{ cn_olap_clickhouse_servers }} + table: cyber_narrator_galaxy.raw_session_relation_subscriber_app_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: etl_sd_kafka_to_cn_clickhouse + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [ session_record_processor ] + - name: session_record_processor + downstream: [ aggregate_processor ] + - name: aggregate_processor + downstream: [ time_processor ] + - name: time_processor + downstream: [ remove_field_processor ] + - name: remove_field_processor + downstream: [ cn_location_clickhouse_sink,cn_relation_clickhouse_sink ] + - name: cn_location_clickhouse_sink + downstream: [ ] + - name: cn_relation_clickhouse_sink + downstream: [ ] \ No newline at end of file diff --git a/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/udf.plugins b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/udf.plugins new file mode 100644 index 0000000..73f2dc1 --- /dev/null +++ b/cyber_narrator/upgrade/2024/CN-24.08/groot-stream/24.08.1/udf.plugins @@ -0,0 +1,27 @@ +com.geedgenetworks.core.udf.AsnLookup +com.geedgenetworks.core.udf.Eval +com.geedgenetworks.core.udf.GenerateStringArray +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.cn.L7ProtocolAndAppExtract +com.geedgenetworks.core.udf.cn.IdcRenterLookup +com.geedgenetworks.core.udf.cn.LinkDirectionLookup +com.geedgenetworks.core.udf.cn.FqdnCategoryLookup +com.geedgenetworks.core.udf.cn.IcpLookup +com.geedgenetworks.core.udf.cn.FqdnWhoisLookup +com.geedgenetworks.core.udf.cn.DnsServerInfoLookup +com.geedgenetworks.core.udf.cn.AppCategoryLookup +com.geedgenetworks.core.udf.cn.IpZoneLookup +com.geedgenetworks.core.udf.cn.VpnLookup +com.geedgenetworks.core.udf.cn.AnonymityLookup +com.geedgenetworks.core.udf.cn.IocLookup +com.geedgenetworks.core.udf.cn.FieldsMerge +com.geedgenetworks.core.udf.cn.ArrayElementsPrepend +com.geedgenetworks.core.udf.cn.IntelligenceIndicatorLookup +com.geedgenetworks.core.udf.SnowflakeId +com.geedgenetworks.core.udf.UnixTimestampConverter +com.geedgenetworks.core.udf.Domain +com.geedgenetworks.core.udf.cn.BaseStationLookup +com.geedgenetworks.core.udf.cn.H3CellLookup +com.geedgenetworks.core.udf.udaf.LastValue +com.geedgenetworks.core.udf.JsonExtract +com.geedgenetworks.core.udf.Rename \ No newline at end of file