diff --git a/tsg_olap/installation/flink/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 b/tsg_olap/installation/flink/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 new file mode 100644 index 0000000..a49c237 --- /dev/null +++ b/tsg_olap/installation/flink/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 @@ -0,0 +1,150 @@ +sources: + kafka_source: + type: kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 10000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1 + kafka.auto.offset.reset: latest + format: json + +filters: + name_filter: + type: aviator + properties: + expression: event.name == 'traffic_application_protocol_stat' + +processing_pipelines: + pre_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ timestamp_ms ] + parameters: + interval: 1000 + precision: milliseconds + - function: EVAL + output_fields: [name] + parameters: + value_expression: "'application_protocol_stat'" + + explode_app_and_protocol_processor: + type: table + functions: + - function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." + + window_agg_processor: + type: aggregate + group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id] + window_type: tumbling_processing_time + window_size: 1 + window_slide: 1 + mini_batch: false # false, true + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + output_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ c2s_pkts ] + output_fields: [ c2s_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_pkts ] + output_fields: [ s2c_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_bytes ] + output_fields: [ c2s_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_bytes ] + output_fields: [ s2c_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + output_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + output_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + output_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + output_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_ooorder_pkts ] + output_fields: [ c2s_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_ooorder_pkts ] + output_fields: [ s2c_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + output_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + output_fields: [ s2c_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_bytes ] + output_fields: [ c2s_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_bytes ] + output_fields: [ s2c_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_bytes ] + output_fields: [ in_bytes ] + - function: NUMBER_SUM + lookup_fields: [ out_bytes ] + output_fields: [ out_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_pkts ] + output_fields: [ in_pkts ] + - function: NUMBER_SUM + lookup_fields: [ out_pkts ] + output_fields: [ out_pkts ] + +sinks: + kafka_sink: + type: kafka + properties: + topic: APPLICATION-PROTOCOL-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 100 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + log.failures.only: true + +application: + env: + name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [name_filter] + - name: name_filter + downstream: [pre_processor] + - name: pre_processor + downstream: [explode_app_and_protocol_processor] + - name: explode_app_and_protocol_processor + downstream: [window_agg_processor] + - name: window_agg_processor + downstream: [kafka_sink] + - name: kafka_sink + downstream: [] \ No newline at end of file diff --git a/tsg_olap/upgrade/TSG-24.10/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 b/tsg_olap/upgrade/TSG-24.10/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 new file mode 100644 index 0000000..a49c237 --- /dev/null +++ b/tsg_olap/upgrade/TSG-24.10/groot_stream/templates/agg_app_protocol_traffic.yaml.j2 @@ -0,0 +1,150 @@ +sources: + kafka_source: + type: kafka + properties: + topic: NETWORK-TRAFFIC-METRIC + kafka.bootstrap.servers: {{ kafka_source_servers }} + kafka.session.timeout.ms: 60000 + kafka.max.poll.records: 10000 + kafka.max.partition.fetch.bytes: 31457280 + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1 + kafka.auto.offset.reset: latest + format: json + +filters: + name_filter: + type: aviator + properties: + expression: event.name == 'traffic_application_protocol_stat' + +processing_pipelines: + pre_processor: + type: projection + functions: + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ timestamp_ms ] + output_fields: [ timestamp_ms ] + parameters: + interval: 1000 + precision: milliseconds + - function: EVAL + output_fields: [name] + parameters: + value_expression: "'application_protocol_stat'" + + explode_app_and_protocol_processor: + type: table + functions: + - function: PATH_UNROLL + lookup_fields: [ decoded_path, app] + output_fields: [ protocol_stack_id, app_name ] + parameters: + separator: "." + + window_agg_processor: + type: aggregate + group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id] + window_type: tumbling_processing_time + window_size: 1 + window_slide: 1 + mini_batch: false # false, true + functions: + - function: NUMBER_SUM + lookup_fields: [ sessions ] + output_fields: [ sessions ] + - function: NUMBER_SUM + lookup_fields: [ c2s_pkts ] + output_fields: [ c2s_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_pkts ] + output_fields: [ s2c_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_bytes ] + output_fields: [ c2s_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_bytes ] + output_fields: [ s2c_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_fragments ] + output_fields: [ c2s_fragments ] + - function: NUMBER_SUM + lookup_fields: [ s2c_fragments ] + output_fields: [ s2c_fragments ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_lost_bytes ] + output_fields: [ c2s_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_lost_bytes ] + output_fields: [ s2c_tcp_lost_bytes ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_ooorder_pkts ] + output_fields: [ c2s_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_ooorder_pkts ] + output_fields: [ s2c_tcp_ooorder_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_pkts ] + output_fields: [ c2s_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_pkts ] + output_fields: [ s2c_tcp_retransmitted_pkts ] + - function: NUMBER_SUM + lookup_fields: [ c2s_tcp_retransmitted_bytes ] + output_fields: [ c2s_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ s2c_tcp_retransmitted_bytes ] + output_fields: [ s2c_tcp_retransmitted_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_bytes ] + output_fields: [ in_bytes ] + - function: NUMBER_SUM + lookup_fields: [ out_bytes ] + output_fields: [ out_bytes ] + - function: NUMBER_SUM + lookup_fields: [ in_pkts ] + output_fields: [ in_pkts ] + - function: NUMBER_SUM + lookup_fields: [ out_pkts ] + output_fields: [ out_pkts ] + +sinks: + kafka_sink: + type: kafka + properties: + topic: APPLICATION-PROTOCOL-METRIC + kafka.bootstrap.servers: {{ kafka_sink_servers }} + kafka.retries: 0 + kafka.linger.ms: 100 + kafka.request.timeout.ms: 30000 + kafka.batch.size: 262144 + kafka.buffer.memory: 134217728 + kafka.max.request.size: 10485760 + kafka.compression.type: snappy + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252 + format: json + log.failures.only: true + +application: + env: + name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka + shade.identifier: aes + pipeline: + object-reuse: true + topology: + - name: kafka_source + downstream: [name_filter] + - name: name_filter + downstream: [pre_processor] + - name: pre_processor + downstream: [explode_app_and_protocol_processor] + - name: explode_app_and_protocol_processor + downstream: [window_agg_processor] + - name: window_agg_processor + downstream: [kafka_sink] + - name: kafka_sink + downstream: [] \ No newline at end of file