diff --git a/README.md b/README.md
index b888ea8..8a1bc55 100644
--- a/README.md
+++ b/README.md
@@ -1,22 +1,65 @@
# app-protocol-stat-traffic-merge
-Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,周期15秒。
+Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,统计周期5秒,watermark5秒。
## 数据源
-以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS
-### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。
-### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。
-## 统计操作
-### 1.过滤name是traffic_application_protocol_stat的数据。
+以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS
+
+### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。(TSG 23.05版本)
+
+### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。(≥TSG 23.05版本)
+
+## 操作
+
+### 1.过滤Measurement Name是traffic_application_protocol_stat的数据。
+
### 2.基于Tags内容进行分组统计。
+
### 3.拆分protocol_stack_id协议树为多个节点
+
#### 例如,ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为:
+
##### ETHERNET
+
##### ETHERNET.IPv4
+
##### ETHERNET.IPv4.TCP
+
##### ETHERNET.IPv4.TCP.https
+
##### ETHERNET.IPv4.TCP.https.kingsoft
+
##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office
+
### 4.app_name仅在终端节点输出。
+
### 5.输出结果时Measurement Name=application_protocol_stat。
+
+
+
+## 启动
+
+Standalone:
+
+`flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
+
+Yarn:
+
+`flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
+
+
+
+## 配置项说明
+
+|配置项|类型|必填|默认值|含义|
+|--|--|--|--|--|
+|source.kafka.topic|STRING|Y||数据源的Kafka Topic 名称|
+|source.kafka.props.*|STRING|N||数据源的Kafka 消费者连接相关参数|
+|startup.mode|STRING|N|group|数据源消费策略(group:从当前消费组的偏移量开始,latest:从分区最新的偏移量开始,earliest:从分区最早的偏移量开始)|
+|sink.kafka.topic|STRING|Y||数据输出的Kafka Topic 名称|
+|sink.kafka.props.*|STRING|N||数据输出的Kafka 生产者连接相关参数|
+|count.window.time|INT|N|5|聚合窗口大小(单位:秒)|
+|watermark.max.orderness|INT|N|5|乱序数据的最大延迟时间(单位:秒)|
+|log.failures.only|BOOLEAN|N|false|生产者出现错误时任务失败,还是只记录错误信息|
+|measurement.name|STRING|N|application_protocol_stat|数据输出时的指标标识名称|
diff --git a/pom.xml b/pom.xml
index 2568e90..f3e7a70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 1.6
+ 2.0.0
app-protocol-stat-traffic-merge
http://www.example.com
diff --git a/properties/default_config.properties b/properties/default_config.properties
deleted file mode 100644
index 89d7987..0000000
--- a/properties/default_config.properties
+++ /dev/null
@@ -1,46 +0,0 @@
-#====================Kafka KafkaConsumer====================#
-#kafka source connection timeout
-session.timeout.ms=60000
-
-#kafka source poll
-max.poll.records=5000
-
-#kafka source poll bytes
-max.partition.fetch.bytes=31457280
-
-#====================Kafka KafkaProducer====================#
-#producer重试的次数设置
-retries=0
-
-#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
-linger.ms=10
-
-#如果在超时之前未收到响应,客户端将在必要时重新发送请求
-request.timeout.ms=30000
-
-#producer都是按照batch进行发送的,批次大小,默认:16384
-batch.size=262144
-
-#Producer端用于缓存消息的缓冲区大小
-#128M
-buffer.memory=134217728
-
-#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-#10M
-max.request.size=10485760
-
-#生产者压缩模式 none or snappy
-producer.kafka.compression.type=snappy
-
-#生产者ack
-producer.ack=1
-
-#====================kafka default====================#
-#kafka SASL验证用户名-加密
-kafka.user=nsyGpHKGFA4KW0zro9MDdw==
-
-#kafka SASL及SSL验证密码-加密
-kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
-
-#====================Topology Default====================#
-measurement.name=application_protocol_stat
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index e401d29..82a236c 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,42 +1,27 @@
-#--------------------------------地址配置------------------------------#
-#管理kafka地址
-source.kafka.servers=192.168.44.12:9094
-
-#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
-
-#--------------------------------HTTP------------------------------#
-#kafka 证书地址
-tools.library=D:\\workerspace\\dat
-
-#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=APP-PROTOCOL-TEST
+source.kafka.topic=NETWORK-TRAFFIC-METRIC
+
+source.kafka.props.bootstrap.servers=192.168.44.12:9094
+
+source.kafka.props.group.id=appapp-protocol-merge-231109-1
+
+source.kafka.props.security.protocol=SASL_PLAINTEXT
+
+source.kafka.props.sasl.mechanism=PLAIN
+
+source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
#补全数据 输出 topic
sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
-#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=livecharts-test-20230423-2
+sink.kafka.props.bootstrap.servers=192.168.44.12:9094
-#--------------------------------topology配置------------------------------#
-#consumer 并行度
-source.parallelism=1
+sink.kafka.props.security.protocol=SASL_PLAINTEXT
-#map函数并行度
-parse.parallelism=1
+sink.kafka.props.sasl.mechanism=PLAIN
-#第一次窗口计算并行度
-window.parallelism=1
+sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
-#producer 并行度
-sink.parallelism=1
-
-#预聚合窗口时间
count.window.time=5
-#watermark延迟
-watermark.max.orderness=5
-
-#数据源 firewall or agent
-metrics.data.source=firewall
+watermark.max.orderness=5
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
deleted file mode 100644
index 24702a5..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.zdjizhi.common.config;
-
-
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-
-/**
- * @author Administrator
- */
-public class GlobalConfig {
-
- private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
-
- static {
- encryptor.setPassword("galaxy");
- }
-
- /**
- * 协议分隔符,需要转义
- */
- public static final String PROTOCOL_SPLITTER = "\\.";
-
-
- /**
- * System
- */
- public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
- public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
- public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
- public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
- public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness");
- public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
- public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
- public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
- public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
-
- /**
- * Kafka common
- */
- public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
- public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
-
-
- /**
- * kafka sink config
- */
- public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
- public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
- public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
- public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
- public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
-
-
- /**
- * kafka source config
- */
- public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
- public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
- public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
- public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
- public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
- public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
-
-
- /**
- * kafka限流配置-20201117
- */
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
-
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
deleted file mode 100644
index 0ae91e5..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.zdjizhi.common.config;
-
-import com.zdjizhi.utils.StringUtil;
-
-import java.io.IOException;
-import java.util.Locale;
-import java.util.Properties;
-
-
-/**
- * @author Administrator
- */
-
-public final class GlobalConfigLoad {
-
- private static Properties propDefault = new Properties();
- private static Properties propService = new Properties();
-
-
- static String getStringProperty(Integer type, String key) {
- if (type == 0) {
- return propService.getProperty(key);
- } else if (type == 1) {
- return propDefault.getProperty(key);
- } else {
- return null;
- }
-
- }
-
- static Integer getIntProperty(Integer type, String key) {
- if (type == 0) {
- return Integer.parseInt(propService.getProperty(key));
- } else if (type == 1) {
- return Integer.parseInt(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Long getLongProperty(Integer type, String key) {
- if (type == 0) {
- return Long.parseLong(propService.getProperty(key));
- } else if (type == 1) {
- return Long.parseLong(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Boolean getBooleanProperty(Integer type, String key) {
- if (type == 0) {
- return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else if (type == 1) {
- return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else {
- return null;
- }
- }
-
- static {
- try {
- propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
- } catch (IOException | RuntimeException e) {
- propDefault = null;
- propService = null;
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
new file mode 100644
index 0000000..738537c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -0,0 +1,72 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Containing configuration options for the Fusion application.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+public class MergeConfigs {
+
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+
+
+ public static final ConfigOption SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+
+ public static final ConfigOption SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+
+ public static final ConfigOption COUNT_WINDOW_TIME =
+ ConfigOptions.key("count.window.time")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate window time");
+
+
+ public static final ConfigOption WARTERMARK_MAX_ORDERNESS =
+ ConfigOptions.key("watermark.max.orderness")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate watermark max time");
+
+
+ public static final ConfigOption STARTUP_MODE =
+ ConfigOptions.key("startup.mode")
+ .stringType()
+ .defaultValue("group")
+ .withDescription("The offset commit mode for the consumer.");
+
+
+ public static final ConfigOption LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Defines whether the producer should fail on errors, or only log them.");
+
+
+ public static final ConfigOption MEASUREMENT_NAME =
+ ConfigOptions.key("measurement.name")
+ .stringType()
+ .defaultValue("application_protocol_stat")
+ .withDescription("The data identification.");
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
new file mode 100644
index 0000000..4da7616
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+/**
+ * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
+ * properties with a specific prefix. This class allows retrieving properties that start with the
+ * given `prefix` and converts them into a `java.util.Properties` object.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+
+public class MergeConfiguration {
+ private final Configuration config;
+
+ public MergeConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 5f046c9..116c45a 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -2,7 +2,8 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
@@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
/**
* @author qidaijie
* @Package com.zdjizhi.topology
@@ -36,36 +39,51 @@ public class ApplicationProtocolTopology {
public static void main(String[] args) {
try {
+
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+
+ //水印
WatermarkStrategy> strategyForSession = WatermarkStrategy
- .>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS))
- .withTimestampAssigner((element,timestamp) -> element.f2);
+ .>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.f2);
//数据源
- DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
- .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
//解析数据
SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData())
.assignTimestampsAndWatermarks(strategyForSession)
- .name("ParseDataProcess")
- .setParallelism(GlobalConfig.PARSE_PARALLELISM);
+ .name("ParseDataProcess");
//增量聚合窗口
SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
- .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
.reduce(new DispersionCountWindow(), new MergeCountWindow())
- .name("DispersionCountWindow")
- .setParallelism(GlobalConfig.WINDOW_PARALLELISM);
+ .name("DispersionCountWindow");
//拆分数据
SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
- .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
+ .name("ResultFlatMap");
//输出
- resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
- .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
+ resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
+ config.get(SINK_KAFKA_TOPIC),
+ config.get(LOG_FAILURES_ONLY)));
environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE");
} catch (Exception e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
index 22f2f51..28c01f5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
@@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.JSONWriter;
-import com.zdjizhi.common.config.GlobalConfig;
-import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil;
@@ -20,9 +17,14 @@ import org.apache.flink.util.Collector;
*/
public class ResultFlatMap implements FlatMapFunction {
private static final Log logger = LogFactory.get();
+ /**
+ * 协议分隔符,需要转义
+ */
+ private static final String PROTOCOL_SPLITTER = "\\.";
+
@Override
- public void flatMap(Metrics metrics, Collector out) throws Exception {
+ public void flatMap(Metrics metrics, Collector out) {
try {
Tags tags = metrics.getTags();
String protocolStackId = tags.getProtocol_stack_id();
@@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction {
tags.setApp_name(null);
StringBuilder stringBuilder = new StringBuilder();
- String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
index 8216320..44276e2 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
@@ -19,8 +19,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
public class DispersionCountWindow implements ReduceFunction> {
private static final Log logger = LogFactory.get();
+
@Override
- public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
+ public Tuple3 reduce(Tuple3 value1, Tuple3 value2) {
try {
Fields cacheData = value1.f1;
Fields newData = value2.f1;
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
index 2677855..4766000 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
@@ -2,12 +2,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -18,17 +18,30 @@ import org.apache.flink.util.Collector;
* @Description:
* @date 2023/4/2314:43
*/
-public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> {
+public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get();
+ private String NAME = null;
+
@Override
- public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ final Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+
+ }
+
+ @Override
+ public void process(String windowKey, Context context, Iterable> input, Collector output) {
try {
long timestamp_ms = context.window().getStart();
- for (Tuple3 tuple : input) {
+ for (Tuple3 tuple : input) {
Tags tags = tuple.f0;
Fields fields = tuple.f1;
- Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms);
+ Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms);
output.collect(metrics);
}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
index 6a48bcf..cc8b32c 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.utils.StringUtil;
import org.apache.datasketches.hll.HllSketch;
@@ -19,8 +18,6 @@ import java.util.Base64;
*/
public class MetricUtil {
private static final Log logger = LogFactory.get();
- private static final String METRICS_DEFAULT_TYPE = "agent";
-
/**
* 用于对业务指标进行统计
@@ -58,28 +55,26 @@ public class MetricUtil {
Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
- if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) {
- String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- clientIpSketch);
- } else {
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- null);
- }
+// String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
+// return new Fields(sessions,
+// inBytes, outBytes, inPkts, outPkts,
+// c2sPkts, s2cPkts, c2sBytes, s2cBytes,
+// c2sFragments, s2cFragments,
+// c2sTcpLostBytes, s2cTcpLostBytes,
+// c2sTcpooorderPkts, s2cTcpooorderPkts,
+// c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
+// c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
+// clientIpSketch);
+
+ return new Fields(sessions,
+ inBytes, outBytes, inPkts, outPkts,
+ c2sPkts, s2cPkts, c2sBytes, s2cBytes,
+ c2sFragments, s2cFragments,
+ c2sTcpLostBytes, s2cTcpLostBytes,
+ c2sTcpooorderPkts, s2cTcpooorderPkts,
+ c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
+ c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
+ null);
}
/**
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
deleted file mode 100644
index 877b2e6..0000000
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.zdjizhi.utils.kafka;
-
-import com.zdjizhi.common.config.GlobalConfig;
-import org.apache.kafka.common.config.SslConfigs;
-
-import java.util.Properties;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.kafka
- * @Description:
- * @date 2021/9/610:37
- */
-class CertUtils {
- /**
- * Kafka SASL认证端口
- */
- private static final String SASL_PORT = "9094";
-
- /**
- * Kafka SSL认证端口
- */
- private static final String SSL_PORT = "9095";
-
- /**
- * 根据连接信息端口判断认证方式。
- *
- * @param servers kafka 连接信息
- * @param properties kafka 连接配置信息
- */
- static void chooseCert(String servers, Properties properties) {
- if (servers.contains(SASL_PORT)) {
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
- } else if (servers.contains(SSL_PORT)) {
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- }
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index b0bc2fb..397814f 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,8 +1,8 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
@@ -13,35 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54
*/
public class KafkaConsumer {
- private static Properties createConsumerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
- properties.put("group.id", GlobalConfig.GROUP_ID);
- properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
- properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
- properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
- properties.put("partition.discovery.interval.ms", "10000");
-
- CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
-
- return properties;
- }
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
- public static FlinkKafkaConsumer getKafkaConsumer() {
- FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
- new SimpleStringSchema(), createConsumerConfig());
+ public static FlinkKafkaConsumer getKafkaConsumer(Properties properties, String topic, String startupMode) {
- //随着checkpoint提交,将offset提交到kafka
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
- //从消费组当前的offset开始消费
- kafkaConsumer.setStartFromGroupOffsets();
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
+
+ switch (startupMode) {
+ case "group":
+ kafkaConsumer.setStartFromGroupOffsets();
+ break;
+ case "latest":
+ kafkaConsumer.setStartFromLatest();
+ break;
+ case "earliest":
+ kafkaConsumer.setStartFromEarliest();
+ break;
+ default:
+ kafkaConsumer.setStartFromGroupOffsets();
+ }
return kafkaConsumer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index ca62061..c7cd3f2 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional;
import java.util.Properties;
@@ -16,33 +14,29 @@ import java.util.Properties;
*/
public class KafkaProducer {
- private static Properties createProducerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS);
- properties.put("acks", GlobalConfig.PRODUCER_ACK);
- properties.put("retries", GlobalConfig.RETRIES);
- properties.put("linger.ms", GlobalConfig.LINGER_MS);
- properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", GlobalConfig.BATCH_SIZE);
- properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY);
- properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+ public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
+ setDefaultConfig(properties, "ack", 1);
+ setDefaultConfig(properties, "retries", 0);
+ setDefaultConfig(properties, "linger.ms", 10);
+ setDefaultConfig(properties, "request.timeout.ms", 30000);
+ setDefaultConfig(properties, "batch.size", 262144);
+ setDefaultConfig(properties, "buffer.memory", 134217728);
+ setDefaultConfig(properties, "max.request.size", 10485760);
+ setDefaultConfig(properties, "compression.type", "snappy");
- CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties);
-
- return properties;
- }
-
-
- public static FlinkKafkaProducer getKafkaProducer() {
- FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
- GlobalConfig.SINK_KAFKA_TOPIC,
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
+ topic,
new SimpleStringSchema(),
- createProducerConfig(), Optional.empty());
+ properties, Optional.empty());
- //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
- kafkaProducer.setLogFailuresOnly(true);
+ kafkaProducer.setLogFailuresOnly(logFailuresOnly);
return kafkaProducer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
index 7b9580b..7a2b5d3 100644
--- a/src/test/java/com/zdjizhi/ConfigTest.java
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -1,11 +1,10 @@
package com.zdjizhi;
import com.zdjizhi.conf.FusionConfiguration;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -30,6 +29,7 @@ public class ConfigTest {
System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+
final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
config.get(SOURCE_KAFKA_TOPIC),
new SimpleStringSchema(),
@@ -41,12 +41,14 @@ public class ConfigTest {
sourceStream.process(new ProcessFunction() {
@Override
- public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
+ public void processElement(String value, ProcessFunction.Context ctx, Collector out) {
out.collect(value);
}
}).print();
+
+
environment.execute();
} catch (Exception e) {
diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java
index 7900d61..287b3fb 100644
--- a/src/test/java/com/zdjizhi/ConventionalTest.java
+++ b/src/test/java/com/zdjizhi/ConventionalTest.java
@@ -1,6 +1,5 @@
package com.zdjizhi;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
@@ -22,7 +21,7 @@ public class ConventionalTest {
System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2";
- String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto);