TSG-22596 app-protocol-stat-traffic-merge应用迁移到groot,agg_app_protocol_traffic任务配置模板

This commit is contained in:
lifengchao
2024-09-30 10:55:44 +08:00
parent 2a1f457ffc
commit 9c420ddbfd
2 changed files with 300 additions and 0 deletions

View File

@@ -0,0 +1,150 @@
sources:
kafka_source:
type: kafka
properties:
topic: NETWORK-TRAFFIC-METRIC
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 10000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1
kafka.auto.offset.reset: latest
format: json
filters:
name_filter:
type: aviator
properties:
expression: event.name == 'traffic_application_protocol_stat'
processing_pipelines:
pre_processor:
type: projection
functions:
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ timestamp_ms ]
parameters:
interval: 1000
precision: milliseconds
- function: EVAL
output_fields: [name]
parameters:
value_expression: "'application_protocol_stat'"
explode_app_and_protocol_processor:
type: table
functions:
- function: PATH_UNROLL
lookup_fields: [ decoded_path, app]
output_fields: [ protocol_stack_id, app_name ]
parameters:
separator: "."
window_agg_processor:
type: aggregate
group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id]
window_type: tumbling_processing_time
window_size: 1
window_slide: 1
mini_batch: false # false, true
functions:
- function: NUMBER_SUM
lookup_fields: [ sessions ]
output_fields: [ sessions ]
- function: NUMBER_SUM
lookup_fields: [ c2s_pkts ]
output_fields: [ c2s_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_pkts ]
output_fields: [ s2c_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_bytes ]
output_fields: [ c2s_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_bytes ]
output_fields: [ s2c_bytes ]
- function: NUMBER_SUM
lookup_fields: [ c2s_fragments ]
output_fields: [ c2s_fragments ]
- function: NUMBER_SUM
lookup_fields: [ s2c_fragments ]
output_fields: [ s2c_fragments ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_lost_bytes ]
output_fields: [ c2s_tcp_lost_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_lost_bytes ]
output_fields: [ s2c_tcp_lost_bytes ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_ooorder_pkts ]
output_fields: [ c2s_tcp_ooorder_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_ooorder_pkts ]
output_fields: [ s2c_tcp_ooorder_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_retransmitted_pkts ]
output_fields: [ c2s_tcp_retransmitted_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_retransmitted_pkts ]
output_fields: [ s2c_tcp_retransmitted_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_retransmitted_bytes ]
output_fields: [ c2s_tcp_retransmitted_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_retransmitted_bytes ]
output_fields: [ s2c_tcp_retransmitted_bytes ]
- function: NUMBER_SUM
lookup_fields: [ in_bytes ]
output_fields: [ in_bytes ]
- function: NUMBER_SUM
lookup_fields: [ out_bytes ]
output_fields: [ out_bytes ]
- function: NUMBER_SUM
lookup_fields: [ in_pkts ]
output_fields: [ in_pkts ]
- function: NUMBER_SUM
lookup_fields: [ out_pkts ]
output_fields: [ out_pkts ]
sinks:
kafka_sink:
type: kafka
properties:
topic: APPLICATION-PROTOCOL-METRIC
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 100
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
format: json
log.failures.only: true
application:
env:
name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka
shade.identifier: aes
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [name_filter]
- name: name_filter
downstream: [pre_processor]
- name: pre_processor
downstream: [explode_app_and_protocol_processor]
- name: explode_app_and_protocol_processor
downstream: [window_agg_processor]
- name: window_agg_processor
downstream: [kafka_sink]
- name: kafka_sink
downstream: []

View File

@@ -0,0 +1,150 @@
sources:
kafka_source:
type: kafka
properties:
topic: NETWORK-TRAFFIC-METRIC
kafka.bootstrap.servers: {{ kafka_source_servers }}
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 10000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
kafka.group.id: {{ kafka_source_group_id }} # app-protocol-merge-230510-1
kafka.auto.offset.reset: latest
format: json
filters:
name_filter:
type: aviator
properties:
expression: event.name == 'traffic_application_protocol_stat'
processing_pipelines:
pre_processor:
type: projection
functions:
- function: UNIX_TIMESTAMP_CONVERTER
lookup_fields: [ timestamp_ms ]
output_fields: [ timestamp_ms ]
parameters:
interval: 1000
precision: milliseconds
- function: EVAL
output_fields: [name]
parameters:
value_expression: "'application_protocol_stat'"
explode_app_and_protocol_processor:
type: table
functions:
- function: PATH_UNROLL
lookup_fields: [ decoded_path, app]
output_fields: [ protocol_stack_id, app_name ]
parameters:
separator: "."
window_agg_processor:
type: aggregate
group_by_fields: [timestamp_ms, name, vsys_id, device_id, device_group, data_center, app_name, protocol_stack_id]
window_type: tumbling_processing_time
window_size: 1
window_slide: 1
mini_batch: false # false, true
functions:
- function: NUMBER_SUM
lookup_fields: [ sessions ]
output_fields: [ sessions ]
- function: NUMBER_SUM
lookup_fields: [ c2s_pkts ]
output_fields: [ c2s_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_pkts ]
output_fields: [ s2c_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_bytes ]
output_fields: [ c2s_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_bytes ]
output_fields: [ s2c_bytes ]
- function: NUMBER_SUM
lookup_fields: [ c2s_fragments ]
output_fields: [ c2s_fragments ]
- function: NUMBER_SUM
lookup_fields: [ s2c_fragments ]
output_fields: [ s2c_fragments ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_lost_bytes ]
output_fields: [ c2s_tcp_lost_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_lost_bytes ]
output_fields: [ s2c_tcp_lost_bytes ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_ooorder_pkts ]
output_fields: [ c2s_tcp_ooorder_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_ooorder_pkts ]
output_fields: [ s2c_tcp_ooorder_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_retransmitted_pkts ]
output_fields: [ c2s_tcp_retransmitted_pkts ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_retransmitted_pkts ]
output_fields: [ s2c_tcp_retransmitted_pkts ]
- function: NUMBER_SUM
lookup_fields: [ c2s_tcp_retransmitted_bytes ]
output_fields: [ c2s_tcp_retransmitted_bytes ]
- function: NUMBER_SUM
lookup_fields: [ s2c_tcp_retransmitted_bytes ]
output_fields: [ s2c_tcp_retransmitted_bytes ]
- function: NUMBER_SUM
lookup_fields: [ in_bytes ]
output_fields: [ in_bytes ]
- function: NUMBER_SUM
lookup_fields: [ out_bytes ]
output_fields: [ out_bytes ]
- function: NUMBER_SUM
lookup_fields: [ in_pkts ]
output_fields: [ in_pkts ]
- function: NUMBER_SUM
lookup_fields: [ out_pkts ]
output_fields: [ out_pkts ]
sinks:
kafka_sink:
type: kafka
properties:
topic: APPLICATION-PROTOCOL-METRIC
kafka.bootstrap.servers: {{ kafka_sink_servers }}
kafka.retries: 0
kafka.linger.ms: 100
kafka.request.timeout.ms: 30000
kafka.batch.size: 262144
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
kafka.security.protocol: SASL_PLAINTEXT
kafka.sasl.mechanism: PLAIN
kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a80bea937958aaa485c2acc2b475603495a23eb59f055e037c0b186acb22886bd0275ca91f1633441d9943e7962942252
format: json
log.failures.only: true
application:
env:
name: {{ job_name }} # agg_app_protocol_traffic_kafka_to_kafka
shade.identifier: aes
pipeline:
object-reuse: true
topology:
- name: kafka_source
downstream: [name_filter]
- name: name_filter
downstream: [pre_processor]
- name: pre_processor
downstream: [explode_app_and_protocol_processor]
- name: explode_app_and_protocol_processor
downstream: [window_agg_processor]
- name: window_agg_processor
downstream: [kafka_sink]
- name: kafka_sink
downstream: []