diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/dos_sketch_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/dos_sketch_kafka_to_ndc_kafka new file mode 100644 index 0000000..0da8393 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/dos_sketch_kafka_to_ndc_kafka @@ -0,0 +1,47 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DOS-SKETCH-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: dos_sketch_record_kafka_to_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: DOS-SKETCH-RECORD + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: dos_sketch_record_kafka_to_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka new file mode 100644 index 0000000..7ab5423 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_proxy_event_kafka_to_ndc_kafka @@ -0,0 +1,157 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PROXY-EVENT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_proxy_event_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + kafka_sink: + type : kafka + properties: + topic: PROXY-EVENT-PROCESSED + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + +application: + + env: # [object] Environment Variables + name: etl_proxy_event_kafka_to_ndc_kafka # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka new file mode 100644 index 0000000..b852127 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_ndc_kafka @@ -0,0 +1,157 @@ +sources: + kafka_source: + type: kafka + properties: + topic: SESSION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_session_record_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + +sinks: + kafka_sink: + type : kafka + properties: + topic: SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + +application: + + env: # [object] Environment Variables + name: etl_session_record_kafka_to_ndc_kafka # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka similarity index 78% rename from tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_kafka rename to tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka index 1af1ba2..9e7c869 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_session_record_kafka_to_kafka +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/etl_transaction_record_kafka_to_ndc_kafka @@ -2,32 +2,31 @@ sources: kafka_source: type: kafka # fields: # [array of object] Field List, if not set, all fields(Map) will be output. + # watermark_timestamp: common_recv_time # [string] Watermark Field Name + # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms + # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: - topic: SESSION-RECORD + topic: TRANSACTION-RECORD kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 - kafka.security.protocol: SSL - kafka.ssl.endpoint.identification.algorithm: "" - kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks - kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e - kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks - kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e - kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e - #kafka.security.protocol: SASL_PLAINTEXT - #kafka.sasl.mechanism: PLAIN - #kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 - kafka.group.id: etl_session_record_kafka_to_kafka-20231221 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_transaction_record_kafka_to_ndc_kafka kafka.auto.offset.reset: latest format: json processing_pipelines: etl_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - remove_fields: # [array of object] Field List, if not set, all fields(Map) will be output. - output_fields: # [array of object] Field List, if set, fields will be remove. + remove_fields: + output_fields: + properties: + key: value functions: # [array of object] Function List + - function: ASN_LOOKUP lookup_fields: [server_ip] output_fields: [server_asn] @@ -130,8 +129,8 @@ sinks: kafka_sink: type : kafka properties: - topic: SESSION-RECORD-PROCESSED - kafka.bootstrap.servers: {{ national_center_kafka_servers }} + topic: TRANSACTION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -148,7 +147,7 @@ sinks: application: env: # [object] Environment Variables - name: etl_session_record_kafka_to_kafka # [string] Job Name + name: etl_transaction_record_kafka_to_ndc_kafka # [string] Job Name shade.identifier: aes pipeline: object-reuse: true # [boolean] Object Reuse, default is false diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/network_traffic_metrics_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/network_traffic_metrics_kafka_to_ndc_kafka new file mode 100644 index 0000000..66326ad --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/network_traffic_metrics_kafka_to_ndc_kafka @@ -0,0 +1,46 @@ +sources: + kafka_source: + type: kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: network_traffic_metrics_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: network_traffic_metrics_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/object_statistics_metric_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/object_statistics_metric_kafka_to_ndc_kafka new file mode 100644 index 0000000..c6d3eb5 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/object_statistics_metric_kafka_to_ndc_kafka @@ -0,0 +1,47 @@ +sources: + kafka_source: + type: kafka + properties: + topic: OBJECT-STATISTICS-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: object_statistics_metric_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: object_statistics_metric_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/policy_rule_metrics_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/policy_rule_metrics_kafka_to_ndc_kafka new file mode 100644 index 0000000..713bd65 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/policy_rule_metrics_kafka_to_ndc_kafka @@ -0,0 +1,48 @@ +sources: + kafka_source: + type: kafka + properties: + topic: POLICY-RULE-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: policy_rule_metrics_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: POLICY-RULE-METRIC + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: policy_rule_metrics_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/pxy_exch_intermedia_cert_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/pxy_exch_intermedia_cert_kafka_to_ndc_kafka new file mode 100644 index 0000000..fefd32d --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/pxy_exch_intermedia_cert_kafka_to_ndc_kafka @@ -0,0 +1,56 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PXY-EXCH-INTERMEDIA-CERT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.ssl.keystore.location: + kafka.ssl.keystore.password: + kafka.ssl.truststore.location: + kafka.ssl.truststore.password: + kafka.ssl.key.password: + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.buffer.memory: + kafka.group.id: pxy_exch_intermedia_cert_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + kafka.max.request.size: + kafka.compression.type: none + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: PXY-EXCH-INTERMEDIA-CERT + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: pxy_exch_intermedia_cert_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/statistics_rule_metric_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/statistics_rule_metric_kafka_to_ndc_kafka new file mode 100644 index 0000000..9f247ad --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/statistics_rule_metric_kafka_to_ndc_kafka @@ -0,0 +1,48 @@ +sources: + kafka_source: + type: kafka + properties: + topic: STATISTICS-RULE-METRIC + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: statistics_rule_metric_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: STATISTICS-RULE-METRIC + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: statistics_rule_metric_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/troubleshooting_file_stream_record_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/troubleshooting_file_stream_record_kafka_to_ndc_kafka new file mode 100644 index 0000000..2aa7847 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/troubleshooting_file_stream_record_kafka_to_ndc_kafka @@ -0,0 +1,48 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TROUBLESHOOTING-FILE-STREAM-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: troubleshooting_file_stream_record_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: raw + + +sinks: + kafka_sink: + type : kafka + properties: + topic: TROUBLESHOOTING-FILE-STREAM-RECORD + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: raw + + +application: + env: + name: troubleshooting_file_stream_record_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [kafka_sink] + - name: kafka_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/voip_record_kafka_to_ndc_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/voip_record_kafka_to_ndc_kafka new file mode 100644 index 0000000..3652211 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/datacenter_dt/voip_record_kafka_to_ndc_kafka @@ -0,0 +1,87 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: statistics_rule_metric_kafka_to_ndc_kafka + kafka.auto.offset.reset: latest + format: json + + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + + - function: PATH_COMBINE + lookup_fields: [rtp_pcap_path] + output_fields: [rtp_pcap_path] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, rtp_pcap_path] + + - function: PATH_COMBINE + lookup_fields: [http_request_body] + output_fields: [http_request_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_request_body] + + - function: PATH_COMBINE + lookup_fields: [http_response_body] + output_fields: [http_response_body] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, http_response_body] + + - function: PATH_COMBINE + lookup_fields: [mail_eml_file] + output_fields: [mail_eml_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, mail_eml_file] + + - function: PATH_COMBINE + lookup_fields: [packet_capture_file] + output_fields: [packet_capture_file] + parameters: + path: [props.hos.path, props.hos.bucket.name.traffic_file, packet_capture_file] + + +sinks: + kafka_sink: + type : kafka + properties: + topic: VOIP-RECORD + kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + + +application: + env: + name: voip_record_kafka_to_ndc_kafka + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [kafka_sink] + - name: kafka_sink diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/dos_event_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/dos_event_kafka_to_clickhouse new file mode 100644 index 0000000..195b8ec --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/dos_event_kafka_to_clickhouse @@ -0,0 +1,42 @@ +sources: + kafka_source: + type: kafka + properties: + topic: DOS-EVENT + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: dos_event_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.dos_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + +application: + env: + name: dos_event_kafka_to_clickhouse + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_session_record_processed_kafka_to_cn_kafka b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_session_record_processed_kafka_to_cn_kafka new file mode 100644 index 0000000..82a30a6 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_session_record_processed_kafka_to_cn_kafka @@ -0,0 +1,392 @@ +sources: + kafka_source: + type: kafka + properties: # [object] Source Properties + topic: SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + session_record_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + functions: # [array of object] Function List + - function: EVAL + output_fields: [ domain ] + parameters: + value_expression: server_fqdn + + - function: EVAL + output_fields: [ domain_sld ] + parameters: + value_expression: server_domain + + - function: CN_L7_PROTOCOL_AND_APP_EXTRACT + parameters: + decoded_path_field_name: decoded_path + app_transition_field_name: app_transition + l7_protocol_field_name: l7_protocol + app_field_name: app + l7_protocol: DHCP,DNS,FTP,GRE,GTP,HTTP,HTTPS,ICMP,IMAP,IMAPS,IPSEC,ISAKMP,XMPP,L2TP,LDAP,MMS,NETBIOS,NETFLOW,NTP,POP3,POP3S,RDP,PPTP,RADIUS,RTCP,RTP,RTSP,SIP,SMB,SMTP,SMTPS,SNMP,SSDP,SSH,SSL,STUN,TELNET,TFTP,OPENVPN,RTMP,TEREDO,FTPS,DTLS,SPDY,BJNP,QUIC,MDNS,Unknown TCP,Unknown UDP,Unknown Other,IKE,MAIL,SOCKS,DoH,SLP,SSL with ESNI,ISATAP,Stratum,SSL with ECH + + - function: GEOIP_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: client_country_region + PROVINCE: client_super_admin_area + CITY: client_admin_area + LONGITUDE: client_longitude + LATITUDE: client_latitude + ISP: client_isp + + - function: GEOIP_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ ] + parameters: + kb_name: cn_ip_location + option: IP_TO_OBJECT + geolocation_field_mapping: + COUNTRY: server_country_region + PROVINCE: server_super_admin_area + CITY: server_admin_area + LONGITUDE: server_longitude + LATITUDE: server_latitude + ISP: server_isp + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + kb_name: cn_ip_asn + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_IDC_RENTER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_idc_renter ] + parameters: + kb_name: cn_idc_renter + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ in_link_id ] + output_fields: [ in_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_LINK_DIRECTION_LOOKUP + lookup_fields: [ out_link_id ] + output_fields: [ out_link_direction ] + parameters: + kb_name: cn_link_direction + + - function: CN_FQDN_CATEGORY_LOOKUP + lookup_fields: [ domain ] + parameters: + kb_name: cn_fqdn_category + field_mapping: + NAME: domain_category_name + GROUP: domain_category_group + REPUTATION_LEVEL: domain_reputation_level + + - function: CN_ICP_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_icp_company_name ] + parameters: + kb_name: cn_fqdn_icp + + - function: CN_FQDN_WHOIS_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_whois_org ] + parameters: + kb_name: cn_fqdn_whois + + - function: CN_DNS_SERVER_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_dns_server ] + parameters: + kb_name: cn_dns_server + + - function: CN_APP_CATEGORY_LOOKUP + lookup_fields: [ app ] + parameters: + kb_name: cn_app_category + field_mapping: + CATEGORY: app_category + SUBCATEGORY: app_subcategory + COMPANY: app_company + COMPANY_CATEGORY: app_company_category + + - function: EVAL + output_fields: [ client_zone ] + parameters: + value_expression: "flags & 8 == 8 ? 'internal' : 'external'" + + - function: EVAL + output_fields: [ server_zone ] + parameters: + value_expression: "flags & 16 == 16 ? 'internal' : 'external'" + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_zone ] + parameters: + kb_name: none + #kb_name: cn_internal_ip + + - function: CN_IP_ZONE_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_zone ] + parameters: + kb_name: none + #kb_name: cn_internal_ip + + - function: EVAL + output_fields: [ sent_bytes ] + parameters: + value_expression: "sent_bytes == null ? 0 : sent_bytes" + + - function: EVAL + output_fields: [ sent_pkts ] + parameters: + value_expression: "sent_pkts == null ? 0 : sent_pkts" + + - function: EVAL + output_fields: [ received_bytes ] + parameters: + value_expression: "received_bytes == null ? 0 : received_bytes" + + - function: EVAL + output_fields: [ received_pkts ] + parameters: + value_expression: "received_pkts == null ? 0 : received_pkts" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_internal_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_bytes + received_bytes : traffic_internal_byte" + + - function: EVAL + output_fields: [ traffic_internal_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_pkts + received_pkts : traffic_internal_pkt" + + - function: EVAL + output_fields: [ traffic_through_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_bytes + received_bytes : traffic_through_byte" + + - function: EVAL + output_fields: [ traffic_through_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_pkts + received_pkts : traffic_through_pkt" + + - function: EVAL + output_fields: [ sessions ] + parameters: + value_expression: "1" + + - function: EVAL + output_fields: [ internal_query_num ] + parameters: + value_expression: "client_zone == 'internal' ? sessions : internal_query_num" + + - function: EVAL + output_fields: [ external_query_num ] + parameters: + value_expression: "client_zone == 'external' ? sessions : external_query_num" + + - function: CN_VPN_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_ip + option: IP_TO_VPN + + - function: CN_VPN_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_vpn_service_name ] + parameters: + kb_name: cn_vpn_learning_domain + option: DOMAIN_TO_VPN + + - function: CN_IOC_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_malware ] + parameters: + kb_name: cn_ioc_malware + option: IP_TO_MALWARE + + - function: CN_IOC_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_malware ] + parameters: + kb_name: cn_ioc_malware + option: DOMAIN_TO_MALWARE + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ server_ip ] + output_fields: [ server_ip_tags ] + parameters: + kb_name: cn_ip_tag_user_define + option: IP_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ domain ] + output_fields: [ domain_tags ] + parameters: + kb_name: cn_domain_tag_user_define + option: DOMAIN_TO_TAG + + - function: CN_USER_DEFINE_TAG_LOOKUP + lookup_fields: [ app ] + output_fields: [ app_tags ] + parameters: + kb_name: cn_app_tag_user_define + option: APP_TO_TAG + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ client_idc_renter,client_ip_tags ] + output_fields: [ client_ip_tags ] + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + output_fields: [ server_ip_tags ] + + - function: GENERATE_STRING_ARRAY + lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] + output_fields: [ domain_tags ] + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ client_ip_tags ] + output_fields: [ client_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ server_ip_tags ] + output_fields: [ server_ip_tags ] + parameters: + prefix: ip. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ domain_tags ] + output_fields: [ domain_tags ] + parameters: + prefix: domain. + + - function: CN_ARRAY_ELEMENTS_PREPEND + lookup_fields: [ app_tags ] + output_fields: [ app_tags ] + parameters: + prefix: app. +postprocessing_pipelines: + remove_field_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list,traffic_inbound_byte,traffic_inbound_pkt,traffic_outbound_byte,traffic_outbound_pkt,traffic_internal_byte,traffic_internal_pkt,traffic_through_byte,traffic_through_pkt,internal_query_num,external_query_num ] + +sinks: + cn_kafka_sink: + type: kafka + properties: + topic: SESSION-RECORD-CN + kafka.bootstrap.servers: {{ national_center_cn_kafka_servers }} + kafka.retries: 0 + kafka.linger.ms: 10 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + format: json + + +application: + env: + name: etl_session_record_processed_kafka_to_cn_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [ session_record_processor ] + - name: session_record_processor + downstream: [ remove_field_processor ] + - name: remove_field_processor + downstream: [ cn_kafka_sink ] + - name: cn_kafka_sink + downstream: [ ] diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse new file mode 100644 index 0000000..da87056 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/etl_voip_record_kafka_to_clickhouse @@ -0,0 +1,123 @@ +sources: + kafka_source: + type: kafka + properties: + topic: VOIP-CONVERSATION-RECORD + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_voip_record_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + +processing_pipelines: + etl_processor: # [object] Processing Pipeline + type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + remove_fields: + output_fields: + properties: + key: value + functions: # [array of object] Function List + + - function: ASN_LOOKUP + lookup_fields: [server_ip] + output_fields: [server_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: ASN_LOOKUP + lookup_fields: [client_ip] + output_fields: [client_asn] + parameters: + option: IP_TO_ASN + kb_name: tsg_ip_asn + + - function: SNOWFLAKE_ID + lookup_fields: [''] + output_fields: [log_id] + parameters: + data_center_id_num: 1 + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [data_center] + filter: + parameters: + value_expression: $.tags[?(@.tag=='data_center')][0].value + + - function: JSON_EXTRACT + lookup_fields: [device_tag] + output_fields: [device_group] + filter: + parameters: + value_expression: $.tags[?(@.tag=='device_group')][0].value + + - function: CURRENT_UNIX_TIMESTAMP + output_fields: [processing_time] + parameters: + precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [__timestamp] + output_fields: [recv_time] + parameters: + precision: seconds + + - function: EVAL + output_fields: [ingestion_time] + parameters: + value_expression: recv_time + + - function: DOMAIN + lookup_fields: [http_host, ssl_sni, dtls_sni, quic_sni] + output_fields: [server_domain] + parameters: + option: FIRST_SIGNIFICANT_SUBDOMAIN + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_subject] + parameters: + value_field: mail_subject + charset_field: mail_subject_charset + + - function: BASE64_DECODE_TO_STRING + output_fields: [mail_attachment_name] + parameters: + value_field: mail_attachment_name + charset_field: mail_attachment_name_charset + + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.voip_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + + +application: + + env: # [object] Environment Variables + name: etl_voip_record_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [etl_processor] + - name: etl_processor + downstream: [clickhouse_sink] + - name: clickhouse_sink + + diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/proxy_event_processed_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/proxy_event_processed_kafka_to_clickhouse new file mode 100644 index 0000000..fc48f64 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/proxy_event_processed_kafka_to_clickhouse @@ -0,0 +1,41 @@ +sources: + kafka_source: + type: kafka + properties: + topic: PROXY-EVENT-PROCESSED + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: proxy_event_processed_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.proxy_event_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + + +application: + + env: # [object] Environment Variables + name: proxy_event_processed_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/session_record_processed_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/session_record_processed_kafka_to_clickhouse index 8b0eb85..2798840 100644 --- a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/session_record_processed_kafka_to_clickhouse +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/session_record_processed_kafka_to_clickhouse @@ -1,21 +1,13 @@ sources: kafka_source: type: kafka - # fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: - topic: SESSION-RECORD - kafka.bootstrap.servers: "{{ kafka_sink_servers }}" + topic: SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_source_servers }}" kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 - #kafka.security.protocol: SSL - #kafka.ssl.endpoint.identification.algorithm: "" - #kafka.ssl.keystore.location: /data/tsg/olap/flink/topology/data/keystore.jks - #kafka.ssl.keystore.password: 86cf0e2ffba3f541a6c6761313e5cc7e - #kafka.ssl.truststore.location: /data/tsg/olap/flink/topology/data/truststore.jks - #kafka.ssl.truststore.password: 86cf0e2ffba3f541a6c6761313e5cc7e - #kafka.ssl.key.password: 86cf0e2ffba3f541a6c6761313e5cc7e - kafka.security.protocol: SASL_PLAINTEXT + kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 kafka.group.id: session_record_processed_kafka_to_clickhouse @@ -35,6 +27,7 @@ sinks: connection.connect_timeout: 30 connection.query_timeout: 300 + application: env: # [object] Environment Variables @@ -46,4 +39,3 @@ application: - name: kafka_source downstream: [clickhouse_sink] - name: clickhouse_sink - diff --git a/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/transaction_record_processed_kafka_to_clickhouse b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/transaction_record_processed_kafka_to_clickhouse new file mode 100644 index 0000000..9aeb79d --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/multi-datacenter-examples/national_datacenter/transaction_record_processed_kafka_to_clickhouse @@ -0,0 +1,41 @@ +sources: + kafka_source: + type: kafka + properties: + topic: TRANSACTION-RECORD-PROCESSED + kafka.bootstrap.servers: "{{ kafka_source_servers }}" + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 3000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: transaction_record_processed_kafka_to_clickhouse + kafka.auto.offset.reset: latest + format: json + +sinks: + clickhouse_sink: + type: clickhouse + properties: + host: "{{ clickhouse_servers }}" + table: tsg_galaxy_v3.transaction_record_local + batch.size: 100000 + batch.interval: 30s + connection.user: e54c9568586180eede1506eecf3574e9 + connection.password: 86cf0e2ffba3f541a6c6761313e5cc7e + connection.connect_timeout: 30 + connection.query_timeout: 300 + + +application: + + env: # [object] Environment Variables + name: transaction_record_processed_kafka_to_clickhouse # [string] Job Name + shade.identifier: aes + pipeline: + object-reuse: true # [boolean] Object Reuse, default is false + topology: + - name: kafka_source + downstream: [clickhouse_sink] + - name: clickhouse_sink