diff --git a/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 b/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 index 774dd3b..b0b8ca4 100644 --- a/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 +++ b/tsg_olap/installation/flink/groot_stream/templates/traffic_sketch_metric.yaml.j2 @@ -4,13 +4,12 @@ sources: properties: topic: TRAFFIC-SKETCH-METRIC kafka.bootstrap.servers: {{ kafka_source_servers }} - kafka.client.id: TRAFFIC-SKETCH-METRIC kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 kafka.security.protocol: SASL_PLAINTEXT kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 kafka.group.id: etl_traffic_sketch_metric kafka.auto.offset.reset: latest kafka.compression.type: none @@ -18,29 +17,16 @@ sources: processing_pipelines: etl_processor: # [object] Processing Pipeline - type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl + type: projection remove_fields: output_fields: functions: # [array of object] Function List - - - function: FLATTEN - lookup_fields: [ fields,tags ] - output_fields: [ ] + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ recv_time ] parameters: - #prefix: "" - depth: 3 - # delimiter: "." - - - function: RENAME - lookup_fields: [ '' ] - output_fields: [ '' ] - filter: - parameters: - # parent_fields: [tags] - #rename_fields: - # tags: tags - rename_expression: key =string.replace_all(key,'tags.','');key =string.replace_all(key,'fields.','');return key; - + precision: seconds + interval: 60 - function: EVAL output_fields: [ internal_ip ] parameters: @@ -49,13 +35,6 @@ processing_pipelines: output_fields: [ external_ip ] parameters: value_expression: 'direction=Outbound? server_ip : client_ip' - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ timestamp_ms ] - output_fields: [ recv_time ] - parameters: - precision: seconds - - function: SNOWFLAKE_ID lookup_fields: [ '' ] output_fields: [ log_id ] @@ -70,7 +49,6 @@ sinks: properties: topic: TRAFFIC-SKETCH-METRIC kafka.bootstrap.servers: {{ kafka_sink_servers }} - kafka.client.id: TRAFFIC-SKETCH-METRIC kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -105,3 +83,4 @@ application: object-reuse: true # [boolean] Object Reuse, default is false {{ topology }} + diff --git a/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse b/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse index 55f2aed..939c1f9 100644 --- a/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse +++ b/tsg_olap/upgrade/TSG-24.08/groot_stream/single-cluster-examples/etl_traffic_sketch_metric_kafka_to_clickhouse @@ -16,7 +16,7 @@ sources: format: json processing_pipelines: - pre_etl_processor: # [object] Processing Pipeline + etl_processor: # [object] Processing Pipeline type: projection remove_fields: output_fields: @@ -26,72 +26,7 @@ processing_pipelines: output_fields: [ recv_time ] parameters: precision: seconds - interval: 300 - - aggregate_processor: - type: aggregate - group_by_fields: [vsys_id,device_id,device_group,data_center,ip_protocol,direction,client_ip,server_ip,server_domain,app,recv_time] - window_type: tumbling_processing_time # tumbling_event_time,sliding_processing_time,sliding_event_time - window_size: 300 - functions: - - function: NUMBER_SUM - lookup_fields: [ sessions ] - - function: NUMBER_SUM - lookup_fields: [ bytes ] - - function: NUMBER_SUM - lookup_fields: [ sent_bytes ] - - function: NUMBER_SUM - lookup_fields: [ received_bytes ] - - function: NUMBER_SUM - lookup_fields: [ pkts ] - - function: NUMBER_SUM - lookup_fields: [ sent_pkts ] - - function: NUMBER_SUM - lookup_fields: [ received_pkts ] - - function: NUMBER_SUM - lookup_fields: [ asymmetric_c2s_flows ] - - function: NUMBER_SUM - lookup_fields: [ asymmetric_s2c_flows ] - - function: NUMBER_SUM - lookup_fields: [ c2s_fragments ] - - function: NUMBER_SUM - lookup_fields: [ s2c_fragments ] - - function: NUMBER_SUM - lookup_fields: [ c2s_tcp_lost_bytes ] - - function: NUMBER_SUM - lookup_fields: [ s2c_tcp_lost_bytes ] - - function: NUMBER_SUM - lookup_fields: [ c2s_tcp_retransmitted_pkts ] - - function: NUMBER_SUM - lookup_fields: [ s2c_tcp_retransmitted_pkts ] - - function: FIRST_VALUE - lookup_fields: [ client_country ] - - function: FIRST_VALUE - lookup_fields: [ server_country ] - - function: FIRST_VALUE - lookup_fields: [ client_asn ] - - function: FIRST_VALUE - lookup_fields: [ server_asn ] - - function: FIRST_VALUE - lookup_fields: [ server_fqdn ] - - function: FIRST_VALUE - lookup_fields: [ app_category ] - - function: FIRST_VALUE - lookup_fields: [ c2s_ttl ] - - function: FIRST_VALUE - lookup_fields: [ s2c_ttl ] - - function: FIRST_VALUE - lookup_fields: [ c2s_link_id ] - - function: FIRST_VALUE - lookup_fields: [ s2c_link_id ] - - - - post_etl_processor: # [object] Processing Pipeline - type: projection - remove_fields: - output_fields: - functions: # [array of object] Function List + interval: 60 - function: EVAL output_fields: [ internal_ip ] parameters: @@ -100,7 +35,6 @@ processing_pipelines: output_fields: [ external_ip ] parameters: value_expression: 'direction=Outbound? server_ip : client_ip' - - function: SNOWFLAKE_ID lookup_fields: [ '' ] output_fields: [ log_id ] @@ -129,14 +63,9 @@ application: pipeline: object-reuse: true # [boolean] Object Reuse, default is false topology: - topology: - name: kafka_source - downstream: [pre_etl_processor] - - name: pre_etl_processor - downstream: [aggregate_processor] - - name: aggregate_processor - downstream: [post_etl_processor] - - name: post_etl_processor + downstream: [etl_processor] + - name: etl_processor downstream: [clickhouse_sink] - name: clickhouse_sink