This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
李奉超 7936bb7339 Merge branch 'develop' into 'master'
测试时设置setParallelism代码删除

See merge request galaxy/tsg_olap/app-protocol-stat-traffic-merge!12
2024-07-29 09:42:47 +00:00
2024-04-02 05:48:40 +00:00

app-protocol-stat-traffic-merge

Live Traffic Chart统计程序基于协议栈拆分多流聚合存储到协议与应用统计表中使用增量窗口计算默认统计周期1秒watermark1秒。

数据源

以下不论基于哪种计算Topic均为NETWORK-TRAFFIC-METRIC

1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics聚合粒度为1秒。TSG 23.05版本)

2.功能端进行统计产生的Application and Protocol Metrics数据聚合粒度为1秒。≥TSG 23.05版本)

操作

1.过滤Measurement Name是traffic_application_protocol_stat的数据。

2.基于Tags内容进行分组统计。

3.将原始Metrics中的decoded_path和app进行拼接,例如:{"decoded_path": "ETHERNET.IPv4.TCP.https","app": "kingsoft.wps_office"}

拼接后结果ETHERNET.IPv4.TCP.https.kingsoft.wps_office

4.拆分protocol_stack_id协议树为多个节点例如ETHERNET.IPv4.TCP.https.kingsoft.wps_office每个节点ID为

  1. ETHERNET
  2. ETHERNET.IPv4
  3. ETHERNET.IPv4.TCP
  4. ETHERNET.IPv4.TCP.https
  5. ETHERNET.IPv4.TCP.https.kingsoft
  6. ETHERNET.IPv4.TCP.https.kingsoft.wps_office

4.1 为避免展示重复的协议拆分应去除Decoded Path最后一个元素与 app第一个元素重复的基础协议例如{"decoded_path": "ETHERNET.IPv4.TCP.dns","app": "dns"}

将decoded_path内最后的一个基础协议去除
  1. ETHERNET
  2. ETHERNET.IPv4
  3. ETHERNET.IPv4.TCP
  4. ETHERNET.IPv4.TCP.dns

5.app_name仅在终端节点输出。

6.输出结果时Measurement Name=application_protocol_stat。


启动

Standalone:

flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties

Yarn:

flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties


配置项说明

配置项 类型 必填 默认值 含义
source.kafka.topic STRING Y 数据源的Kafka Topic 名称
source.kafka.props.* STRING N 数据源的Kafka 消费者连接相关参数
startup.mode STRING N group 数据源消费策略group从当前消费组的偏移量开始latest从分区最新的偏移量开始earliest从分区最早的偏移量开始
sink.kafka.topic STRING Y 数据输出的Kafka Topic 名称
sink.kafka.props.* STRING N 数据输出的Kafka 生产者连接相关参数
count.window.time INT N 5 聚合窗口大小(单位:秒)
watermark.max.orderness INT N 5 乱序数据的最大延迟时间(单位:秒)
log.failures.only BOOLEAN N false true生产者出现错误时任务失败false只记录错误信息
measurement.name STRING N application_protocol_stat 数据输出时的指标标识名称
Description
No description provided
Readme 135 KiB
Languages
Java 100%