diff --git a/pom.xml b/pom.xml
index 7c34db7..0624c9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 2.2.2
+ 2.3.0
app-protocol-stat-traffic-merge
http://www.example.com
diff --git a/src/main/java/com/zdjizhi/common/pojo/Data.java b/src/main/java/com/zdjizhi/common/pojo/Data.java
new file mode 100644
index 0000000..3be235c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/pojo/Data.java
@@ -0,0 +1,251 @@
+package com.zdjizhi.common.pojo;
+
+import java.io.Serializable;
+
+public class Data implements Serializable {
+ public long timestamp_ms;
+ public String name;
+
+ public int vsys_id;
+ public String device_id;
+ public String device_group;
+ public String data_center;
+ public String decoded_path;
+ public String app;
+
+ public long sessions;
+ public long in_bytes;
+ public long out_bytes;
+ public long in_pkts;
+ public long out_pkts;
+ public long c2s_pkts;
+ public long s2c_pkts;
+ public long c2s_bytes;
+ public long s2c_bytes;
+ public long c2s_fragments;
+ public long s2c_fragments;
+ public long c2s_tcp_lost_bytes;
+ public long s2c_tcp_lost_bytes;
+ public long c2s_tcp_ooorder_pkts;
+ public long s2c_tcp_ooorder_pkts;
+ public long c2s_tcp_retransmitted_pkts;
+ public long s2c_tcp_retransmitted_pkts;
+ public long c2s_tcp_retransmitted_bytes;
+ public long s2c_tcp_retransmitted_bytes;
+
+ public long getTimestamp_ms() {
+ return timestamp_ms;
+ }
+
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getDecoded_path() {
+ return decoded_path;
+ }
+
+ public void setDecoded_path(String decoded_path) {
+ this.decoded_path = decoded_path;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getIn_bytes() {
+ return in_bytes;
+ }
+
+ public void setIn_bytes(long in_bytes) {
+ this.in_bytes = in_bytes;
+ }
+
+ public long getOut_bytes() {
+ return out_bytes;
+ }
+
+ public void setOut_bytes(long out_bytes) {
+ this.out_bytes = out_bytes;
+ }
+
+ public long getIn_pkts() {
+ return in_pkts;
+ }
+
+ public void setIn_pkts(long in_pkts) {
+ this.in_pkts = in_pkts;
+ }
+
+ public long getOut_pkts() {
+ return out_pkts;
+ }
+
+ public void setOut_pkts(long out_pkts) {
+ this.out_pkts = out_pkts;
+ }
+
+ public long getC2s_pkts() {
+ return c2s_pkts;
+ }
+
+ public void setC2s_pkts(long c2s_pkts) {
+ this.c2s_pkts = c2s_pkts;
+ }
+
+ public long getS2c_pkts() {
+ return s2c_pkts;
+ }
+
+ public void setS2c_pkts(long s2c_pkts) {
+ this.s2c_pkts = s2c_pkts;
+ }
+
+ public long getC2s_bytes() {
+ return c2s_bytes;
+ }
+
+ public void setC2s_bytes(long c2s_bytes) {
+ this.c2s_bytes = c2s_bytes;
+ }
+
+ public long getS2c_bytes() {
+ return s2c_bytes;
+ }
+
+ public void setS2c_bytes(long s2c_bytes) {
+ this.s2c_bytes = s2c_bytes;
+ }
+
+ public long getC2s_fragments() {
+ return c2s_fragments;
+ }
+
+ public void setC2s_fragments(long c2s_fragments) {
+ this.c2s_fragments = c2s_fragments;
+ }
+
+ public long getS2c_fragments() {
+ return s2c_fragments;
+ }
+
+ public void setS2c_fragments(long s2c_fragments) {
+ this.s2c_fragments = s2c_fragments;
+ }
+
+ public long getC2s_tcp_lost_bytes() {
+ return c2s_tcp_lost_bytes;
+ }
+
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ }
+
+ public long getS2c_tcp_lost_bytes() {
+ return s2c_tcp_lost_bytes;
+ }
+
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ }
+
+ public long getC2s_tcp_ooorder_pkts() {
+ return c2s_tcp_ooorder_pkts;
+ }
+
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ }
+
+ public long getS2c_tcp_ooorder_pkts() {
+ return s2c_tcp_ooorder_pkts;
+ }
+
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_pkts() {
+ return c2s_tcp_retransmitted_pkts;
+ }
+
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ }
+
+ public long getS2c_tcp_retransmitted_pkts() {
+ return s2c_tcp_retransmitted_pkts;
+ }
+
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_bytes() {
+ return c2s_tcp_retransmitted_bytes;
+ }
+
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ }
+
+ public long getS2c_tcp_retransmitted_bytes() {
+ return s2c_tcp_retransmitted_bytes;
+ }
+
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/pojo/ResultData.java b/src/main/java/com/zdjizhi/common/pojo/ResultData.java
new file mode 100644
index 0000000..b0907c1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/pojo/ResultData.java
@@ -0,0 +1,251 @@
+package com.zdjizhi.common.pojo;
+
+import java.io.Serializable;
+
+public class ResultData implements Serializable {
+ public long timestamp_ms;
+ public String name;
+
+ public int vsys_id;
+ public String device_id;
+ public String device_group;
+ public String data_center;
+ public String protocol_stack_id;
+ public String app_name;
+
+ public long sessions;
+ public long in_bytes;
+ public long out_bytes;
+ public long in_pkts;
+ public long out_pkts;
+ public long c2s_pkts;
+ public long s2c_pkts;
+ public long c2s_bytes;
+ public long s2c_bytes;
+ public long c2s_fragments;
+ public long s2c_fragments;
+ public long c2s_tcp_lost_bytes;
+ public long s2c_tcp_lost_bytes;
+ public long c2s_tcp_ooorder_pkts;
+ public long s2c_tcp_ooorder_pkts;
+ public long c2s_tcp_retransmitted_pkts;
+ public long s2c_tcp_retransmitted_pkts;
+ public long c2s_tcp_retransmitted_bytes;
+ public long s2c_tcp_retransmitted_bytes;
+
+ public long getTimestamp_ms() {
+ return timestamp_ms;
+ }
+
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getProtocol_stack_id() {
+ return protocol_stack_id;
+ }
+
+ public void setProtocol_stack_id(String protocol_stack_id) {
+ this.protocol_stack_id = protocol_stack_id;
+ }
+
+ public String getApp_name() {
+ return app_name;
+ }
+
+ public void setApp_name(String app_name) {
+ this.app_name = app_name;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getIn_bytes() {
+ return in_bytes;
+ }
+
+ public void setIn_bytes(long in_bytes) {
+ this.in_bytes = in_bytes;
+ }
+
+ public long getOut_bytes() {
+ return out_bytes;
+ }
+
+ public void setOut_bytes(long out_bytes) {
+ this.out_bytes = out_bytes;
+ }
+
+ public long getIn_pkts() {
+ return in_pkts;
+ }
+
+ public void setIn_pkts(long in_pkts) {
+ this.in_pkts = in_pkts;
+ }
+
+ public long getOut_pkts() {
+ return out_pkts;
+ }
+
+ public void setOut_pkts(long out_pkts) {
+ this.out_pkts = out_pkts;
+ }
+
+ public long getC2s_pkts() {
+ return c2s_pkts;
+ }
+
+ public void setC2s_pkts(long c2s_pkts) {
+ this.c2s_pkts = c2s_pkts;
+ }
+
+ public long getS2c_pkts() {
+ return s2c_pkts;
+ }
+
+ public void setS2c_pkts(long s2c_pkts) {
+ this.s2c_pkts = s2c_pkts;
+ }
+
+ public long getC2s_bytes() {
+ return c2s_bytes;
+ }
+
+ public void setC2s_bytes(long c2s_bytes) {
+ this.c2s_bytes = c2s_bytes;
+ }
+
+ public long getS2c_bytes() {
+ return s2c_bytes;
+ }
+
+ public void setS2c_bytes(long s2c_bytes) {
+ this.s2c_bytes = s2c_bytes;
+ }
+
+ public long getC2s_fragments() {
+ return c2s_fragments;
+ }
+
+ public void setC2s_fragments(long c2s_fragments) {
+ this.c2s_fragments = c2s_fragments;
+ }
+
+ public long getS2c_fragments() {
+ return s2c_fragments;
+ }
+
+ public void setS2c_fragments(long s2c_fragments) {
+ this.s2c_fragments = s2c_fragments;
+ }
+
+ public long getC2s_tcp_lost_bytes() {
+ return c2s_tcp_lost_bytes;
+ }
+
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ }
+
+ public long getS2c_tcp_lost_bytes() {
+ return s2c_tcp_lost_bytes;
+ }
+
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ }
+
+ public long getC2s_tcp_ooorder_pkts() {
+ return c2s_tcp_ooorder_pkts;
+ }
+
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ }
+
+ public long getS2c_tcp_ooorder_pkts() {
+ return s2c_tcp_ooorder_pkts;
+ }
+
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_pkts() {
+ return c2s_tcp_retransmitted_pkts;
+ }
+
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ }
+
+ public long getS2c_tcp_retransmitted_pkts() {
+ return s2c_tcp_retransmitted_pkts;
+ }
+
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_bytes() {
+ return c2s_tcp_retransmitted_bytes;
+ }
+
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ }
+
+ public long getS2c_tcp_retransmitted_bytes() {
+ return s2c_tcp_retransmitted_bytes;
+ }
+
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java
new file mode 100644
index 0000000..cc7ca0a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolLegacyTopology.java
@@ -0,0 +1,95 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
+import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Metrics;
+import com.zdjizhi.common.pojo.Tags;
+import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
+import com.zdjizhi.utils.functions.map.ResultFlatMap;
+import com.zdjizhi.utils.functions.process.ParsingData;
+import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
+import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class ApplicationProtocolLegacyTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ try {
+
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //水印
+ WatermarkStrategy> strategyForSession = WatermarkStrategy
+ .>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.f2);
+
+ //数据源
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ //解析数据
+ SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData())
+ .assignTimestampsAndWatermarks(strategyForSession)
+ .name("ParseDataProcess");
+
+ //增量聚合窗口
+ SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
+ .reduce(new DispersionCountWindow(), new MergeCountWindow())
+ .name("DispersionCountWindow");
+
+ //拆分数据
+ SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
+ .name("ResultFlatMap");
+
+ //输出
+ resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
+ config.get(SINK_KAFKA_TOPIC),
+ config.get(LOG_FAILURES_ONLY)));
+
+ environment.execute(config.get(JOB_NAME));
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :");
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index d1a60c9..951703c 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -1,96 +1,267 @@
package com.zdjizhi.topology;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONPath;
+import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.config.MergeConfiguration;
-import com.zdjizhi.common.pojo.Fields;
-import com.zdjizhi.common.pojo.Metrics;
-import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
-import com.zdjizhi.utils.functions.map.ResultFlatMap;
-import com.zdjizhi.utils.functions.process.ParsingData;
-import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
-import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
+import com.zdjizhi.common.pojo.*;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.*;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.Arrays;
import static com.zdjizhi.common.config.MergeConfigs.*;
/**
- * @author qidaijie
+ * @author lifengchao
* @Package com.zdjizhi.topology
* @Description:
- * @date 2021/5/2016:42
+ * @date 2024/7/23 11:20
*/
public class ApplicationProtocolTopology {
- private static final Log logger = LogFactory.get();
+ static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopology.class);
- public static void main(String[] args) {
- try {
-
- // param check
- if (args.length < 1) {
- throw new IllegalArgumentException("Error: Not found properties path. " +
- "\nUsage: flink -c xxx xxx.jar app.properties.");
- }
-
- final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
-
- ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
- final Configuration config = tool.getConfiguration();
- environment.getConfig().setGlobalJobParameters(config);
- final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
-
-
- //水印
- WatermarkStrategy> strategyForSession = WatermarkStrategy
- .>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
- .withTimestampAssigner((element, timestamp) -> element.f2);
-
- //数据源
- DataStream streamSource = environment.addSource(
- KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
- config.get(SOURCE_KAFKA_TOPIC),
- config.get(STARTUP_MODE)));
-
- //解析数据
- SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData())
- .assignTimestampsAndWatermarks(strategyForSession)
- .name("ParseDataProcess");
-
- //增量聚合窗口
- SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
- .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
- .reduce(new DispersionCountWindow(), new MergeCountWindow())
- .name("DispersionCountWindow");
-
- //拆分数据
- SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
- .name("ResultFlatMap");
-
- //输出
- resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
- config.get(SINK_KAFKA_TOPIC),
- config.get(LOG_FAILURES_ONLY)));
-
- environment.execute(config.get(JOB_NAME));
- } catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :");
- e.printStackTrace();
+ public static void main(String[] args) throws Exception{
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
}
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.setParallelism(1);
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //水印
+ WatermarkStrategy strategyForSession = WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
+
+ //数据源
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ //解析数据
+ SingleOutputStreamOperator dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
+
+ //聚合 + 拆分
+ SingleOutputStreamOperator rstStream = dataStream.keyBy(keySelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .aggregate(aggregateFunction(), processWindowFunction());
+
+ //输出
+ rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY)));
+
+ environment.execute(config.get(JOB_NAME));
}
+ private static KeySelector> keySelector(){
+ return new KeySelector>() {
+ @Override
+ public Tuple6 getKey(Data data) throws Exception {
+ return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ }
+ };
+ }
+
+ private static FlatMapFunction parseFlatMapFunction(){
+ return new RichFlatMapFunction() {
+ private JSONPath namePath;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ namePath = JSONPath.of("$.name");
+ }
+
+ @Override
+ public void flatMap(String value, Collector out) throws Exception {
+ JSONReader parser = JSONReader.of(value);
+ try {
+ Object name = namePath.extract(parser);
+ if(!"traffic_application_protocol_stat".equals(name)){
+ return;
+ }
+
+ Data data = JSON.parseObject(value, Data.class);
+ String decodedPath = data.getDecoded_path();
+ if(StringUtils.isBlank(decodedPath)){
+ return;
+ }
+
+ String appFullPath = data.getApp();
+ if(StringUtils.isBlank(appFullPath)){
+ out.collect(data);
+ return;
+ }
+ // decoded_path和app进行拼接,格式化
+ String[] appSplits = appFullPath.split("\\.");
+ data.setApp(appSplits[appSplits.length -1]);
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if(appSplits.length > 1){
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ data.setDecoded_path(decodedPath);
+ }
+ }else{
+ decodedPath = decodedPath + "." + appFullPath;
+ data.setDecoded_path(decodedPath);
+ }
+ out.collect(data);
+ } catch (Exception e) {
+ LOG.error("parse error for value:" + value, e);
+ } finally {
+ parser.close();
+ }
+ }
+ };
+ }
+
+ private static AggregateFunction aggregateFunction(){
+ return new AggregateFunction() {
+
+ @Override
+ public ResultData createAccumulator() {
+ return new ResultData();
+ }
+
+ @Override
+ public ResultData add(Data value, ResultData acc) {
+ acc.sessions = acc.sessions + value.sessions;
+ acc.in_bytes = acc.in_bytes + value.in_bytes;
+ acc.out_bytes = acc.out_bytes + value.out_bytes;
+ acc.in_pkts = acc.in_pkts + value.in_pkts;
+ acc.out_pkts = acc.out_pkts + value.out_pkts;
+ acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
+ acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
+ acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
+ acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
+ acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
+ acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
+ acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
+ acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
+ acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
+ acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
+ acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
+ acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
+ acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
+ acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
+ return acc;
+ }
+
+ @Override
+ public ResultData getResult(ResultData acc) {
+ return acc;
+ }
+
+ @Override
+ public ResultData merge(ResultData a, ResultData b) {
+ a.sessions = a.sessions + b.sessions;
+ a.in_bytes = a.in_bytes + b.in_bytes;
+ a.out_bytes = a.out_bytes + b.out_bytes;
+ a.in_pkts = a.in_pkts + b.in_pkts;
+ a.out_pkts = a.out_pkts + b.out_pkts;
+ a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
+ a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
+ a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
+ a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
+ a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
+ a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
+ a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
+ a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
+ a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
+ a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
+ a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
+ a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
+ a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
+ a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
+ return a;
+ }
+ };
+ }
+
+ private static ProcessWindowFunction, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction, TimeWindow>() {
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+ Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
+ }
+
+ @Override
+ public void process(Tuple6 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
+ long timestamp_ms = context.window().getStart();
+ for (ResultData ele : elements) {
+ ele.timestamp_ms = timestamp_ms;
+ ele.name = NAME;
+ ele.vsys_id = key.f0;
+ ele.device_id = key.f1;
+ ele.device_group = key.f2;
+ ele.data_center = key.f3;
+ ele.app_name = null;
+ String decodedPath = key.f4;
+ String app = key.f5;
+
+ // 拆分
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ ele.protocol_stack_id = subDecodedPath;
+ out.collect(JSON.toJSONString(ele));
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ ele.app_name = app;
+ ele.protocol_stack_id = decodedPath;
+ out.collect(JSON.toJSONString(ele));
+ }
+ }
+ };
+ }
+
+ private static DataStream testSource(StreamExecutionEnvironment environment){
+ return environment.fromCollection(Arrays.asList(
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{}"
+ ));
+ }
}