diff --git a/pom.xml b/pom.xml
index 0624c9e..2c6c9d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 2.3.0
+ 2.3.1
app-protocol-stat-traffic-merge
http://www.example.com
@@ -115,20 +115,20 @@
-
+
org.junit.jupiter
junit-jupiter-api
5.3.2
- compile
+ test
-
+
@@ -175,6 +175,18 @@
flink-connector-kafka_2.12
${flink.version}
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.8.3
@@ -192,11 +204,11 @@
-
+
junit
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
index 8cf604a..c17dabf 100644
--- a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -35,6 +35,11 @@ public class MergeConfigs {
.noDefaultValue()
.withDescription("The Kafka topic used in the sink.");
+ public static final ConfigOption GRANULARITY_SECOND =
+ ConfigOptions.key("granularity.second")
+ .longType()
+ .defaultValue(1L)
+ .withDescription("granularity second");
public static final ConfigOption COUNT_WINDOW_TIME =
ConfigOptions.key("count.window.time")
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 26111e0..66e6603 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -1,27 +1,24 @@
package com.zdjizhi.topology;
import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONPath;
-import com.alibaba.fastjson2.JSONReader;
import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.*;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -29,7 +26,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
import java.util.Arrays;
import static com.zdjizhi.common.config.MergeConfigs.*;
@@ -57,23 +53,22 @@ public class ApplicationProtocolTopology {
environment.getConfig().setGlobalJobParameters(config);
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
- //水印
- WatermarkStrategy strategyForSession = WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
- .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
-
//数据源
DataStream streamSource = environment.addSource(
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
config.get(SOURCE_KAFKA_TOPIC),
config.get(STARTUP_MODE)));
+ final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
+ Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
+ LOG.warn("granularityMs:{}", granularityMs);
+
//解析数据
- SingleOutputStreamOperator dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
+ SingleOutputStreamOperator dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
//聚合 + 拆分
SingleOutputStreamOperator rstStream = dataStream.keyBy(keySelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
.aggregate(aggregateFunction(), processWindowFunction());
//输出
@@ -82,35 +77,38 @@ public class ApplicationProtocolTopology {
environment.execute(config.get(JOB_NAME));
}
- private static KeySelector> keySelector(){
- return new KeySelector>() {
+ private static KeySelector> keySelector(){
+ return new KeySelector>() {
@Override
- public Tuple6 getKey(Data data) throws Exception {
- return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ public Tuple7 getKey(Data data) throws Exception {
+ return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
}
};
}
- private static FlatMapFunction parseFlatMapFunction(){
+ private static FlatMapFunction parseFlatMapFunction(final long granularityMs){
return new RichFlatMapFunction() {
- private JSONPath namePath;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- namePath = JSONPath.of("$.name");
+ LOG.warn("granularityMs:{}", granularityMs);
}
@Override
public void flatMap(String value, Collector out) throws Exception {
- JSONReader parser = JSONReader.of(value);
try {
- Object name = namePath.extract(parser);
- if(!"traffic_application_protocol_stat".equals(name)){
+ // 先快速近似过滤
+ if(!value.contains("traffic_application_protocol_stat")){
return;
}
-
Data data = JSON.parseObject(value, Data.class);
+ // 精确过滤
+ if(!"traffic_application_protocol_stat".equals(data.name)){
+ return;
+ }
+ data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
+
String decodedPath = data.getDecoded_path();
if(StringUtils.isBlank(decodedPath)){
return;
@@ -138,8 +136,6 @@ public class ApplicationProtocolTopology {
out.collect(data);
} catch (Exception e) {
LOG.error("parse error for value:" + value, e);
- } finally {
- parser.close();
}
}
};
@@ -208,8 +204,8 @@ public class ApplicationProtocolTopology {
};
}
- private static ProcessWindowFunction, TimeWindow> processWindowFunction(){
- return new ProcessWindowFunction, TimeWindow>() {
+ private static ProcessWindowFunction, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction, TimeWindow>() {
private String NAME = null;
@Override
@@ -221,18 +217,17 @@ public class ApplicationProtocolTopology {
}
@Override
- public void process(Tuple6 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
- long timestamp_ms = context.window().getStart();
+ public void process(Tuple7 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
for (ResultData ele : elements) {
- ele.timestamp_ms = timestamp_ms;
ele.name = NAME;
- ele.vsys_id = key.f0;
- ele.device_id = key.f1;
- ele.device_group = key.f2;
- ele.data_center = key.f3;
+ ele.timestamp_ms = key.f0;
+ ele.vsys_id = key.f1;
+ ele.device_id = key.f2;
+ ele.device_group = key.f3;
+ ele.data_center = key.f4;
ele.app_name = null;
- String decodedPath = key.f4;
- String app = key.f5;
+ String decodedPath = key.f5;
+ String app = key.f6;
// 拆分
int index = decodedPath.indexOf('.');
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java
new file mode 100644
index 0000000..45edd6b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyEventTime.java
@@ -0,0 +1,267 @@
+package com.zdjizhi.topology;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONPath;
+import com.alibaba.fastjson2.JSONReader;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
+import com.zdjizhi.common.pojo.Data;
+import com.zdjizhi.common.pojo.ResultData;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import com.zdjizhi.utils.kafka.KafkaProducer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
+/**
+ * @author lifengchao
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2024/7/23 11:20
+ */
+public class ApplicationProtocolTopologyEventTime {
+ static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyEventTime.class);
+
+ public static void main(String[] args) throws Exception{
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //水印
+ WatermarkStrategy strategyForSession = WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
+
+ //数据源
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ //解析数据
+ SingleOutputStreamOperator dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
+
+ //聚合 + 拆分
+ SingleOutputStreamOperator rstStream = dataStream.keyBy(keySelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .aggregate(aggregateFunction(), processWindowFunction());
+
+ //输出
+ rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY)));
+
+ environment.execute(config.get(JOB_NAME));
+ }
+
+ private static KeySelector> keySelector(){
+ return new KeySelector>() {
+ @Override
+ public Tuple6 getKey(Data data) throws Exception {
+ return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ }
+ };
+ }
+
+ private static FlatMapFunction parseFlatMapFunction(){
+ return new RichFlatMapFunction() {
+ private JSONPath namePath;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ namePath = JSONPath.of("$.name");
+ }
+
+ @Override
+ public void flatMap(String value, Collector out) throws Exception {
+ JSONReader parser = JSONReader.of(value);
+ try {
+ Object name = namePath.extract(parser);
+ if(!"traffic_application_protocol_stat".equals(name)){
+ return;
+ }
+
+ Data data = JSON.parseObject(value, Data.class);
+ String decodedPath = data.getDecoded_path();
+ if(StringUtils.isBlank(decodedPath)){
+ return;
+ }
+
+ String appFullPath = data.getApp();
+ if(StringUtils.isBlank(appFullPath)){
+ out.collect(data);
+ return;
+ }
+ // decoded_path和app进行拼接,格式化
+ String[] appSplits = appFullPath.split("\\.");
+ data.setApp(appSplits[appSplits.length -1]);
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if(appSplits.length > 1){
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ data.setDecoded_path(decodedPath);
+ }
+ }else{
+ decodedPath = decodedPath + "." + appFullPath;
+ data.setDecoded_path(decodedPath);
+ }
+ out.collect(data);
+ } catch (Exception e) {
+ LOG.error("parse error for value:" + value, e);
+ } finally {
+ parser.close();
+ }
+ }
+ };
+ }
+
+ private static AggregateFunction aggregateFunction(){
+ return new AggregateFunction() {
+
+ @Override
+ public ResultData createAccumulator() {
+ return new ResultData();
+ }
+
+ @Override
+ public ResultData add(Data value, ResultData acc) {
+ acc.sessions = acc.sessions + value.sessions;
+ acc.in_bytes = acc.in_bytes + value.in_bytes;
+ acc.out_bytes = acc.out_bytes + value.out_bytes;
+ acc.in_pkts = acc.in_pkts + value.in_pkts;
+ acc.out_pkts = acc.out_pkts + value.out_pkts;
+ acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
+ acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
+ acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
+ acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
+ acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
+ acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
+ acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
+ acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
+ acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
+ acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
+ acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
+ acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
+ acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
+ acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
+ return acc;
+ }
+
+ @Override
+ public ResultData getResult(ResultData acc) {
+ return acc;
+ }
+
+ @Override
+ public ResultData merge(ResultData a, ResultData b) {
+ a.sessions = a.sessions + b.sessions;
+ a.in_bytes = a.in_bytes + b.in_bytes;
+ a.out_bytes = a.out_bytes + b.out_bytes;
+ a.in_pkts = a.in_pkts + b.in_pkts;
+ a.out_pkts = a.out_pkts + b.out_pkts;
+ a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
+ a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
+ a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
+ a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
+ a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
+ a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
+ a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
+ a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
+ a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
+ a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
+ a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
+ a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
+ a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
+ a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
+ return a;
+ }
+ };
+ }
+
+ private static ProcessWindowFunction, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction, TimeWindow>() {
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+ Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
+ }
+
+ @Override
+ public void process(Tuple6 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
+ long timestamp_ms = context.window().getStart();
+ for (ResultData ele : elements) {
+ ele.timestamp_ms = timestamp_ms;
+ ele.name = NAME;
+ ele.vsys_id = key.f0;
+ ele.device_id = key.f1;
+ ele.device_group = key.f2;
+ ele.data_center = key.f3;
+ ele.app_name = null;
+ String decodedPath = key.f4;
+ String app = key.f5;
+
+ // 拆分
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ ele.protocol_stack_id = subDecodedPath;
+ out.collect(JSON.toJSONString(ele));
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ ele.app_name = app;
+ ele.protocol_stack_id = decodedPath;
+ out.collect(JSON.toJSONString(ele));
+ }
+ }
+ };
+ }
+
+ private static DataStream testSource(StreamExecutionEnvironment environment){
+ return environment.fromCollection(Arrays.asList(
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
+ "{}"
+ ));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java
new file mode 100644
index 0000000..f42c021
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopologyTest.java
@@ -0,0 +1,269 @@
+package com.zdjizhi.topology;
+
+import com.alibaba.fastjson2.JSON;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
+import com.zdjizhi.common.pojo.Data;
+import com.zdjizhi.common.pojo.ResultData;
+import com.zdjizhi.utils.kafka.KafkaConsumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
+
+/**
+ * @author lifengchao
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2024/7/23 11:20
+ */
+public class ApplicationProtocolTopologyTest {
+ static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyTest.class);
+
+ public static void main(String[] args) throws Exception{
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.setParallelism(1);
+
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
+
+ //数据源
+ //DataStream streamSource = testSource(environment);
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
+ Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
+ LOG.info("granularityMs:{}", granularityMs);
+
+ //解析数据
+ SingleOutputStreamOperator dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
+
+ //聚合 + 拆分
+ SingleOutputStreamOperator rstStream = dataStream.keyBy(keySelector())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
+ .aggregate(aggregateFunction(), processWindowFunction());
+
+ //输出
+ rstStream.addSink(new SinkFunction() {
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ System.out.println(value);
+ }
+ });
+
+ environment.execute(config.get(JOB_NAME));
+ }
+
+ private static KeySelector> keySelector(){
+ return new KeySelector>() {
+ @Override
+ public Tuple7 getKey(Data data) throws Exception {
+ return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
+ }
+ };
+ }
+
+ private static FlatMapFunction parseFlatMapFunction(final long granularityMs){
+ return new RichFlatMapFunction() {
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ LOG.info("granularityMs:{}", granularityMs);
+ }
+
+ @Override
+ public void flatMap(String value, Collector out) throws Exception {
+ try {
+ // 先快速近似过滤
+ if(!value.contains("traffic_application_protocol_stat")){
+ return;
+ }
+ Data data = JSON.parseObject(value, Data.class);
+ // 精确过滤
+ if(!"traffic_application_protocol_stat".equals(data.name)){
+ return;
+ }
+ data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
+
+ String decodedPath = data.getDecoded_path();
+ if(StringUtils.isBlank(decodedPath)){
+ return;
+ }
+
+ String appFullPath = data.getApp();
+ if(StringUtils.isBlank(appFullPath)){
+ out.collect(data);
+ return;
+ }
+ // decoded_path和app进行拼接,格式化
+ String[] appSplits = appFullPath.split("\\.");
+ data.setApp(appSplits[appSplits.length -1]);
+ String firstAppProtocol = appSplits[0];
+ String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
+ if (endProtocol.equals(firstAppProtocol)) {
+ if(appSplits.length > 1){
+ decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
+ data.setDecoded_path(decodedPath);
+ }
+ }else{
+ decodedPath = decodedPath + "." + appFullPath;
+ data.setDecoded_path(decodedPath);
+ }
+ out.collect(data);
+ } catch (Exception e) {
+ LOG.error("parse error for value:" + value, e);
+ }
+ }
+ };
+ }
+
+ private static AggregateFunction aggregateFunction(){
+ return new AggregateFunction() {
+
+ @Override
+ public ResultData createAccumulator() {
+ return new ResultData();
+ }
+
+ @Override
+ public ResultData add(Data value, ResultData acc) {
+ acc.sessions = acc.sessions + value.sessions;
+ acc.in_bytes = acc.in_bytes + value.in_bytes;
+ acc.out_bytes = acc.out_bytes + value.out_bytes;
+ acc.in_pkts = acc.in_pkts + value.in_pkts;
+ acc.out_pkts = acc.out_pkts + value.out_pkts;
+ acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
+ acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
+ acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
+ acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
+ acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
+ acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
+ acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
+ acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
+ acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
+ acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
+ acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
+ acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
+ acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
+ acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
+ return acc;
+ }
+
+ @Override
+ public ResultData getResult(ResultData acc) {
+ return acc;
+ }
+
+ @Override
+ public ResultData merge(ResultData a, ResultData b) {
+ a.sessions = a.sessions + b.sessions;
+ a.in_bytes = a.in_bytes + b.in_bytes;
+ a.out_bytes = a.out_bytes + b.out_bytes;
+ a.in_pkts = a.in_pkts + b.in_pkts;
+ a.out_pkts = a.out_pkts + b.out_pkts;
+ a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
+ a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
+ a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
+ a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
+ a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
+ a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
+ a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
+ a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
+ a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
+ a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
+ a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
+ a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
+ a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
+ a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
+ return a;
+ }
+ };
+ }
+
+ private static ProcessWindowFunction, TimeWindow> processWindowFunction(){
+ return new ProcessWindowFunction, TimeWindow>() {
+ private String NAME = null;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+ Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
+ }
+
+ @Override
+ public void process(Tuple7 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
+ for (ResultData ele : elements) {
+ ele.name = NAME;
+ ele.timestamp_ms = key.f0;
+ ele.vsys_id = key.f1;
+ ele.device_id = key.f2;
+ ele.device_group = key.f3;
+ ele.data_center = key.f4;
+ ele.app_name = null;
+ String decodedPath = key.f5;
+ String app = key.f6;
+
+ // 拆分
+ int index = decodedPath.indexOf('.');
+ String subDecodedPath;
+ while (index > 0) {
+ subDecodedPath = decodedPath.substring(0, index);
+ ele.protocol_stack_id = subDecodedPath;
+ out.collect(JSON.toJSONString(ele));
+ index = decodedPath.indexOf('.', index + 1);
+ }
+
+ ele.app_name = app;
+ ele.protocol_stack_id = decodedPath;
+ out.collect(JSON.toJSONString(ele));
+ }
+ }
+ };
+ }
+
+ private static DataStream testSource(StreamExecutionEnvironment environment){
+ return environment.fromCollection(Arrays.asList(
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191152\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990002033}",
+ "{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191153\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990003033}",
+ "{}"
+ ));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
index 28c01f5..4a170a8 100644
--- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
@@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@@ -35,7 +35,7 @@ public class ResultFlatMap implements FlatMapFunction {
String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
- if (StringUtil.isBlank(stringBuilder.toString())) {
+ if (StringUtils.isBlank(stringBuilder.toString())) {
stringBuilder.append(protocolIds[i]);
tags.setProtocol_stack_id(stringBuilder.toString());
metrics.setTags(tags);
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
index 0f06a26..65302f5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -7,7 +7,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -32,7 +32,7 @@ public class ParsingData extends ProcessFunction>.Context ctx, Collector> out) {
try {
- if (StringUtil.isNotBlank(value)) {
+ if (StringUtils.isNotBlank(value)) {
Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
if (isProtocolData != null) {
JSONObject originalLog = JSON.parseObject(value);
@@ -44,7 +44,7 @@ public class ParsingData extends ProcessFunction(tags, fields, timestamp_ms));
@@ -85,7 +85,7 @@ public class ParsingData extends ProcessFunction