TSG-22155 设备时间不同步时可能丢失数据,改为处理时间窗口聚合
This commit is contained in:
28
pom.xml
28
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>app-protocol-stat-traffic-merge</artifactId>
|
||||
<version>2.3.0</version>
|
||||
<version>2.3.1</version>
|
||||
|
||||
<name>app-protocol-stat-traffic-merge</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -115,20 +115,20 @@
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<!--<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib-nodep</artifactId>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
</dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>5.3.2</version>
|
||||
<scope>compile</scope>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<!-- <dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>${zdjz.tools.version}</version>
|
||||
@@ -142,7 +142,7 @@
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependency>-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
|
||||
<dependency>
|
||||
@@ -175,6 +175,18 @@
|
||||
<artifactId>flink-connector-kafka_2.12</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>${scope.type}</scope>-->
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<version>1.1.8.3</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
|
||||
@@ -192,11 +204,11 @@
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
|
||||
<dependency>
|
||||
<!-- <dependency>
|
||||
<groupId>org.jasypt</groupId>
|
||||
<artifactId>jasypt</artifactId>
|
||||
<version>${jasypt.version}</version>
|
||||
</dependency>
|
||||
</dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -35,6 +35,11 @@ public class MergeConfigs {
|
||||
.noDefaultValue()
|
||||
.withDescription("The Kafka topic used in the sink.");
|
||||
|
||||
public static final ConfigOption<Long> GRANULARITY_SECOND =
|
||||
ConfigOptions.key("granularity.second")
|
||||
.longType()
|
||||
.defaultValue(1L)
|
||||
.withDescription("granularity second");
|
||||
|
||||
public static final ConfigOption<Integer> COUNT_WINDOW_TIME =
|
||||
ConfigOptions.key("count.window.time")
|
||||
|
||||
@@ -1,27 +1,24 @@
|
||||
package com.zdjizhi.topology;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONPath;
|
||||
import com.alibaba.fastjson2.JSONReader;
|
||||
import com.zdjizhi.common.config.MergeConfigs;
|
||||
import com.zdjizhi.common.config.MergeConfiguration;
|
||||
import com.zdjizhi.common.pojo.*;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.eventtime.*;
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.api.java.tuple.Tuple7;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -29,7 +26,6 @@ import org.apache.flink.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static com.zdjizhi.common.config.MergeConfigs.*;
|
||||
@@ -57,23 +53,22 @@ public class ApplicationProtocolTopology {
|
||||
environment.getConfig().setGlobalJobParameters(config);
|
||||
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
||||
|
||||
//水印
|
||||
WatermarkStrategy<Data> strategyForSession = WatermarkStrategy
|
||||
.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
|
||||
.withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
|
||||
|
||||
//数据源
|
||||
DataStream<String> streamSource = environment.addSource(
|
||||
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
||||
config.get(SOURCE_KAFKA_TOPIC),
|
||||
config.get(STARTUP_MODE)));
|
||||
|
||||
final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
|
||||
Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
|
||||
LOG.warn("granularityMs:{}", granularityMs);
|
||||
|
||||
//解析数据
|
||||
SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
|
||||
SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
|
||||
|
||||
//聚合 + 拆分
|
||||
SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
|
||||
.aggregate(aggregateFunction(), processWindowFunction());
|
||||
|
||||
//输出
|
||||
@@ -82,35 +77,38 @@ public class ApplicationProtocolTopology {
|
||||
environment.execute(config.get(JOB_NAME));
|
||||
}
|
||||
|
||||
private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){
|
||||
return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() {
|
||||
private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){
|
||||
return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() {
|
||||
@Override
|
||||
public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception {
|
||||
return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
|
||||
public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception {
|
||||
return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static FlatMapFunction<String, Data> parseFlatMapFunction(){
|
||||
private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){
|
||||
return new RichFlatMapFunction<String, Data>() {
|
||||
private JSONPath namePath;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
namePath = JSONPath.of("$.name");
|
||||
LOG.warn("granularityMs:{}", granularityMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(String value, Collector<Data> out) throws Exception {
|
||||
JSONReader parser = JSONReader.of(value);
|
||||
try {
|
||||
Object name = namePath.extract(parser);
|
||||
if(!"traffic_application_protocol_stat".equals(name)){
|
||||
// 先快速近似过滤
|
||||
if(!value.contains("traffic_application_protocol_stat")){
|
||||
return;
|
||||
}
|
||||
|
||||
Data data = JSON.parseObject(value, Data.class);
|
||||
// 精确过滤
|
||||
if(!"traffic_application_protocol_stat".equals(data.name)){
|
||||
return;
|
||||
}
|
||||
data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
|
||||
|
||||
String decodedPath = data.getDecoded_path();
|
||||
if(StringUtils.isBlank(decodedPath)){
|
||||
return;
|
||||
@@ -138,8 +136,6 @@ public class ApplicationProtocolTopology {
|
||||
out.collect(data);
|
||||
} catch (Exception e) {
|
||||
LOG.error("parse error for value:" + value, e);
|
||||
} finally {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -208,8 +204,8 @@ public class ApplicationProtocolTopology {
|
||||
};
|
||||
}
|
||||
|
||||
private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
|
||||
return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() {
|
||||
private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
|
||||
return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() {
|
||||
private String NAME = null;
|
||||
|
||||
@Override
|
||||
@@ -221,18 +217,17 @@ public class ApplicationProtocolTopology {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
|
||||
long timestamp_ms = context.window().getStart();
|
||||
public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
|
||||
for (ResultData ele : elements) {
|
||||
ele.timestamp_ms = timestamp_ms;
|
||||
ele.name = NAME;
|
||||
ele.vsys_id = key.f0;
|
||||
ele.device_id = key.f1;
|
||||
ele.device_group = key.f2;
|
||||
ele.data_center = key.f3;
|
||||
ele.timestamp_ms = key.f0;
|
||||
ele.vsys_id = key.f1;
|
||||
ele.device_id = key.f2;
|
||||
ele.device_group = key.f3;
|
||||
ele.data_center = key.f4;
|
||||
ele.app_name = null;
|
||||
String decodedPath = key.f4;
|
||||
String app = key.f5;
|
||||
String decodedPath = key.f5;
|
||||
String app = key.f6;
|
||||
|
||||
// 拆分
|
||||
int index = decodedPath.indexOf('.');
|
||||
|
||||
@@ -0,0 +1,267 @@
|
||||
package com.zdjizhi.topology;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONPath;
|
||||
import com.alibaba.fastjson2.JSONReader;
|
||||
import com.zdjizhi.common.config.MergeConfigs;
|
||||
import com.zdjizhi.common.config.MergeConfiguration;
|
||||
import com.zdjizhi.common.pojo.Data;
|
||||
import com.zdjizhi.common.pojo.ResultData;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static com.zdjizhi.common.config.MergeConfigs.*;
|
||||
|
||||
/**
|
||||
* @author lifengchao
|
||||
* @Package com.zdjizhi.topology
|
||||
* @Description:
|
||||
* @date 2024/7/23 11:20
|
||||
*/
|
||||
public class ApplicationProtocolTopologyEventTime {
|
||||
static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyEventTime.class);
|
||||
|
||||
public static void main(String[] args) throws Exception{
|
||||
// param check
|
||||
if (args.length < 1) {
|
||||
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||
}
|
||||
|
||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||
final Configuration config = tool.getConfiguration();
|
||||
environment.getConfig().setGlobalJobParameters(config);
|
||||
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
||||
|
||||
//水印
|
||||
WatermarkStrategy<Data> strategyForSession = WatermarkStrategy
|
||||
.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
|
||||
.withTimestampAssigner((element, timestamp) -> element.getTimestamp_ms());
|
||||
|
||||
//数据源
|
||||
DataStream<String> streamSource = environment.addSource(
|
||||
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
||||
config.get(SOURCE_KAFKA_TOPIC),
|
||||
config.get(STARTUP_MODE)));
|
||||
|
||||
//解析数据
|
||||
SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction()).assignTimestampsAndWatermarks(strategyForSession);
|
||||
|
||||
//聚合 + 拆分
|
||||
SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
|
||||
.aggregate(aggregateFunction(), processWindowFunction());
|
||||
|
||||
//输出
|
||||
rstStream.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), config.get(SINK_KAFKA_TOPIC), config.get(LOG_FAILURES_ONLY)));
|
||||
|
||||
environment.execute(config.get(JOB_NAME));
|
||||
}
|
||||
|
||||
private static KeySelector<Data, Tuple6<Integer, String, String, String, String, String>> keySelector(){
|
||||
return new KeySelector<Data, Tuple6<Integer, String, String, String, String, String>>() {
|
||||
@Override
|
||||
public Tuple6<Integer, String, String, String, String, String> getKey(Data data) throws Exception {
|
||||
return new Tuple6<>(data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static FlatMapFunction<String, Data> parseFlatMapFunction(){
|
||||
return new RichFlatMapFunction<String, Data>() {
|
||||
private JSONPath namePath;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
namePath = JSONPath.of("$.name");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(String value, Collector<Data> out) throws Exception {
|
||||
JSONReader parser = JSONReader.of(value);
|
||||
try {
|
||||
Object name = namePath.extract(parser);
|
||||
if(!"traffic_application_protocol_stat".equals(name)){
|
||||
return;
|
||||
}
|
||||
|
||||
Data data = JSON.parseObject(value, Data.class);
|
||||
String decodedPath = data.getDecoded_path();
|
||||
if(StringUtils.isBlank(decodedPath)){
|
||||
return;
|
||||
}
|
||||
|
||||
String appFullPath = data.getApp();
|
||||
if(StringUtils.isBlank(appFullPath)){
|
||||
out.collect(data);
|
||||
return;
|
||||
}
|
||||
// decoded_path和app进行拼接,格式化
|
||||
String[] appSplits = appFullPath.split("\\.");
|
||||
data.setApp(appSplits[appSplits.length -1]);
|
||||
String firstAppProtocol = appSplits[0];
|
||||
String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
|
||||
if (endProtocol.equals(firstAppProtocol)) {
|
||||
if(appSplits.length > 1){
|
||||
decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
|
||||
data.setDecoded_path(decodedPath);
|
||||
}
|
||||
}else{
|
||||
decodedPath = decodedPath + "." + appFullPath;
|
||||
data.setDecoded_path(decodedPath);
|
||||
}
|
||||
out.collect(data);
|
||||
} catch (Exception e) {
|
||||
LOG.error("parse error for value:" + value, e);
|
||||
} finally {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){
|
||||
return new AggregateFunction<Data, ResultData, ResultData>() {
|
||||
|
||||
@Override
|
||||
public ResultData createAccumulator() {
|
||||
return new ResultData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData add(Data value, ResultData acc) {
|
||||
acc.sessions = acc.sessions + value.sessions;
|
||||
acc.in_bytes = acc.in_bytes + value.in_bytes;
|
||||
acc.out_bytes = acc.out_bytes + value.out_bytes;
|
||||
acc.in_pkts = acc.in_pkts + value.in_pkts;
|
||||
acc.out_pkts = acc.out_pkts + value.out_pkts;
|
||||
acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
|
||||
acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
|
||||
acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
|
||||
acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
|
||||
acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
|
||||
acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
|
||||
acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
|
||||
acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
|
||||
acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
|
||||
acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
|
||||
acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
|
||||
acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
|
||||
acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
|
||||
acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
|
||||
return acc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData getResult(ResultData acc) {
|
||||
return acc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData merge(ResultData a, ResultData b) {
|
||||
a.sessions = a.sessions + b.sessions;
|
||||
a.in_bytes = a.in_bytes + b.in_bytes;
|
||||
a.out_bytes = a.out_bytes + b.out_bytes;
|
||||
a.in_pkts = a.in_pkts + b.in_pkts;
|
||||
a.out_pkts = a.out_pkts + b.out_pkts;
|
||||
a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
|
||||
a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
|
||||
a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
|
||||
a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
|
||||
a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
|
||||
a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
|
||||
a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
|
||||
a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
|
||||
a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
|
||||
a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
|
||||
a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
|
||||
a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
|
||||
a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
|
||||
a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
|
||||
return a;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
|
||||
return new ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>() {
|
||||
private String NAME = null;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
|
||||
NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
|
||||
Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Tuple6<Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple6<Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
|
||||
long timestamp_ms = context.window().getStart();
|
||||
for (ResultData ele : elements) {
|
||||
ele.timestamp_ms = timestamp_ms;
|
||||
ele.name = NAME;
|
||||
ele.vsys_id = key.f0;
|
||||
ele.device_id = key.f1;
|
||||
ele.device_group = key.f2;
|
||||
ele.data_center = key.f3;
|
||||
ele.app_name = null;
|
||||
String decodedPath = key.f4;
|
||||
String app = key.f5;
|
||||
|
||||
// 拆分
|
||||
int index = decodedPath.indexOf('.');
|
||||
String subDecodedPath;
|
||||
while (index > 0) {
|
||||
subDecodedPath = decodedPath.substring(0, index);
|
||||
ele.protocol_stack_id = subDecodedPath;
|
||||
out.collect(JSON.toJSONString(ele));
|
||||
index = decodedPath.indexOf('.', index + 1);
|
||||
}
|
||||
|
||||
ele.app_name = app;
|
||||
ele.protocol_stack_id = decodedPath;
|
||||
out.collect(JSON.toJSONString(ele));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static DataStream<String> testSource(StreamExecutionEnvironment environment){
|
||||
return environment.fromCollection(Arrays.asList(
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990000033}",
|
||||
"{}"
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,269 @@
|
||||
package com.zdjizhi.topology;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.zdjizhi.common.config.MergeConfigs;
|
||||
import com.zdjizhi.common.config.MergeConfiguration;
|
||||
import com.zdjizhi.common.pojo.Data;
|
||||
import com.zdjizhi.common.pojo.ResultData;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple7;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static com.zdjizhi.common.config.MergeConfigs.*;
|
||||
|
||||
/**
|
||||
* @author lifengchao
|
||||
* @Package com.zdjizhi.topology
|
||||
* @Description:
|
||||
* @date 2024/7/23 11:20
|
||||
*/
|
||||
public class ApplicationProtocolTopologyTest {
|
||||
static final Logger LOG = LoggerFactory.getLogger(ApplicationProtocolTopologyTest.class);
|
||||
|
||||
public static void main(String[] args) throws Exception{
|
||||
// param check
|
||||
if (args.length < 1) {
|
||||
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||
}
|
||||
|
||||
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
environment.setParallelism(1);
|
||||
|
||||
ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||
final Configuration config = tool.getConfiguration();
|
||||
environment.getConfig().setGlobalJobParameters(config);
|
||||
final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
|
||||
|
||||
//数据源
|
||||
//DataStream<String> streamSource = testSource(environment);
|
||||
DataStream<String> streamSource = environment.addSource(
|
||||
KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
|
||||
config.get(SOURCE_KAFKA_TOPIC),
|
||||
config.get(STARTUP_MODE)));
|
||||
|
||||
final long granularityMs = config.get(GRANULARITY_SECOND) * 1000L;
|
||||
Preconditions.checkArgument(granularityMs >= 1000L && granularityMs <= 60000L);
|
||||
LOG.info("granularityMs:{}", granularityMs);
|
||||
|
||||
//解析数据
|
||||
SingleOutputStreamOperator<Data> dataStream = streamSource.flatMap(parseFlatMapFunction(granularityMs));
|
||||
|
||||
//聚合 + 拆分
|
||||
SingleOutputStreamOperator<String> rstStream = dataStream.keyBy(keySelector())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(config.get(COUNT_WINDOW_TIME))))
|
||||
.aggregate(aggregateFunction(), processWindowFunction());
|
||||
|
||||
//输出
|
||||
rstStream.addSink(new SinkFunction<String>() {
|
||||
@Override
|
||||
public void invoke(String value, Context context) throws Exception {
|
||||
System.out.println(value);
|
||||
}
|
||||
});
|
||||
|
||||
environment.execute(config.get(JOB_NAME));
|
||||
}
|
||||
|
||||
private static KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>> keySelector(){
|
||||
return new KeySelector<Data, Tuple7<Long, Integer, String, String, String, String, String>>() {
|
||||
@Override
|
||||
public Tuple7<Long, Integer, String, String, String, String, String> getKey(Data data) throws Exception {
|
||||
return new Tuple7<>(data.timestamp_ms, data.vsys_id, data.device_id, data.device_group, data.data_center, data.decoded_path, data.app);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static FlatMapFunction<String, Data> parseFlatMapFunction(final long granularityMs){
|
||||
return new RichFlatMapFunction<String, Data>() {
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
LOG.info("granularityMs:{}", granularityMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(String value, Collector<Data> out) throws Exception {
|
||||
try {
|
||||
// 先快速近似过滤
|
||||
if(!value.contains("traffic_application_protocol_stat")){
|
||||
return;
|
||||
}
|
||||
Data data = JSON.parseObject(value, Data.class);
|
||||
// 精确过滤
|
||||
if(!"traffic_application_protocol_stat".equals(data.name)){
|
||||
return;
|
||||
}
|
||||
data.timestamp_ms = data.timestamp_ms / granularityMs * granularityMs;
|
||||
|
||||
String decodedPath = data.getDecoded_path();
|
||||
if(StringUtils.isBlank(decodedPath)){
|
||||
return;
|
||||
}
|
||||
|
||||
String appFullPath = data.getApp();
|
||||
if(StringUtils.isBlank(appFullPath)){
|
||||
out.collect(data);
|
||||
return;
|
||||
}
|
||||
// decoded_path和app进行拼接,格式化
|
||||
String[] appSplits = appFullPath.split("\\.");
|
||||
data.setApp(appSplits[appSplits.length -1]);
|
||||
String firstAppProtocol = appSplits[0];
|
||||
String endProtocol = decodedPath.substring(decodedPath.lastIndexOf(".") + 1);
|
||||
if (endProtocol.equals(firstAppProtocol)) {
|
||||
if(appSplits.length > 1){
|
||||
decodedPath = decodedPath + appFullPath.substring(appFullPath.indexOf("."));
|
||||
data.setDecoded_path(decodedPath);
|
||||
}
|
||||
}else{
|
||||
decodedPath = decodedPath + "." + appFullPath;
|
||||
data.setDecoded_path(decodedPath);
|
||||
}
|
||||
out.collect(data);
|
||||
} catch (Exception e) {
|
||||
LOG.error("parse error for value:" + value, e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static AggregateFunction<Data, ResultData, ResultData> aggregateFunction(){
|
||||
return new AggregateFunction<Data, ResultData, ResultData>() {
|
||||
|
||||
@Override
|
||||
public ResultData createAccumulator() {
|
||||
return new ResultData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData add(Data value, ResultData acc) {
|
||||
acc.sessions = acc.sessions + value.sessions;
|
||||
acc.in_bytes = acc.in_bytes + value.in_bytes;
|
||||
acc.out_bytes = acc.out_bytes + value.out_bytes;
|
||||
acc.in_pkts = acc.in_pkts + value.in_pkts;
|
||||
acc.out_pkts = acc.out_pkts + value.out_pkts;
|
||||
acc.c2s_pkts = acc.c2s_pkts + value.c2s_pkts;
|
||||
acc.s2c_pkts = acc.s2c_pkts + value.s2c_pkts;
|
||||
acc.c2s_bytes = acc.c2s_bytes + value.c2s_bytes;
|
||||
acc.s2c_bytes = acc.s2c_bytes + value.s2c_bytes;
|
||||
acc.c2s_fragments = acc.c2s_fragments + value.c2s_fragments;
|
||||
acc.s2c_fragments = acc.s2c_fragments + value.s2c_fragments;
|
||||
acc.c2s_tcp_lost_bytes = acc.c2s_tcp_lost_bytes + value.c2s_tcp_lost_bytes;
|
||||
acc.s2c_tcp_lost_bytes = acc.s2c_tcp_lost_bytes + value.s2c_tcp_lost_bytes;
|
||||
acc.c2s_tcp_ooorder_pkts = acc.c2s_tcp_ooorder_pkts + value.c2s_tcp_ooorder_pkts;
|
||||
acc.s2c_tcp_ooorder_pkts = acc.s2c_tcp_ooorder_pkts + value.s2c_tcp_ooorder_pkts;
|
||||
acc.c2s_tcp_retransmitted_pkts = acc.c2s_tcp_retransmitted_pkts + value.c2s_tcp_retransmitted_pkts;
|
||||
acc.s2c_tcp_retransmitted_pkts = acc.s2c_tcp_retransmitted_pkts + value.s2c_tcp_retransmitted_pkts;
|
||||
acc.c2s_tcp_retransmitted_bytes = acc.c2s_tcp_retransmitted_bytes + value.c2s_tcp_retransmitted_bytes;
|
||||
acc.s2c_tcp_retransmitted_bytes = acc.s2c_tcp_retransmitted_bytes + value.s2c_tcp_retransmitted_bytes;
|
||||
return acc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData getResult(ResultData acc) {
|
||||
return acc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultData merge(ResultData a, ResultData b) {
|
||||
a.sessions = a.sessions + b.sessions;
|
||||
a.in_bytes = a.in_bytes + b.in_bytes;
|
||||
a.out_bytes = a.out_bytes + b.out_bytes;
|
||||
a.in_pkts = a.in_pkts + b.in_pkts;
|
||||
a.out_pkts = a.out_pkts + b.out_pkts;
|
||||
a.c2s_pkts = a.c2s_pkts + b.c2s_pkts;
|
||||
a.s2c_pkts = a.s2c_pkts + b.s2c_pkts;
|
||||
a.c2s_bytes = a.c2s_bytes + b.c2s_bytes;
|
||||
a.s2c_bytes = a.s2c_bytes + b.s2c_bytes;
|
||||
a.c2s_fragments = a.c2s_fragments + b.c2s_fragments;
|
||||
a.s2c_fragments = a.s2c_fragments + b.s2c_fragments;
|
||||
a.c2s_tcp_lost_bytes = a.c2s_tcp_lost_bytes + b.c2s_tcp_lost_bytes;
|
||||
a.s2c_tcp_lost_bytes = a.s2c_tcp_lost_bytes + b.s2c_tcp_lost_bytes;
|
||||
a.c2s_tcp_ooorder_pkts = a.c2s_tcp_ooorder_pkts + b.c2s_tcp_ooorder_pkts;
|
||||
a.s2c_tcp_ooorder_pkts = a.s2c_tcp_ooorder_pkts + b.s2c_tcp_ooorder_pkts;
|
||||
a.c2s_tcp_retransmitted_pkts = a.c2s_tcp_retransmitted_pkts + b.c2s_tcp_retransmitted_pkts;
|
||||
a.s2c_tcp_retransmitted_pkts = a.s2c_tcp_retransmitted_pkts + b.s2c_tcp_retransmitted_pkts;
|
||||
a.c2s_tcp_retransmitted_bytes = a.c2s_tcp_retransmitted_bytes + b.c2s_tcp_retransmitted_bytes;
|
||||
a.s2c_tcp_retransmitted_bytes = a.s2c_tcp_retransmitted_bytes + b.s2c_tcp_retransmitted_bytes;
|
||||
return a;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow> processWindowFunction(){
|
||||
return new ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>() {
|
||||
private String NAME = null;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
final Configuration configuration = (Configuration) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
|
||||
NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
|
||||
Preconditions.checkArgument(StringUtils.isNotBlank(NAME));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Tuple7<Long, Integer, String, String, String, String, String> key, ProcessWindowFunction<ResultData, String, Tuple7<Long, Integer, String, String, String, String, String>, TimeWindow>.Context context, Iterable<ResultData> elements, Collector<String> out) throws Exception {
|
||||
for (ResultData ele : elements) {
|
||||
ele.name = NAME;
|
||||
ele.timestamp_ms = key.f0;
|
||||
ele.vsys_id = key.f1;
|
||||
ele.device_id = key.f2;
|
||||
ele.device_group = key.f3;
|
||||
ele.data_center = key.f4;
|
||||
ele.app_name = null;
|
||||
String decodedPath = key.f5;
|
||||
String app = key.f6;
|
||||
|
||||
// 拆分
|
||||
int index = decodedPath.indexOf('.');
|
||||
String subDecodedPath;
|
||||
while (index > 0) {
|
||||
subDecodedPath = decodedPath.substring(0, index);
|
||||
ele.protocol_stack_id = subDecodedPath;
|
||||
out.collect(JSON.toJSONString(ele));
|
||||
index = decodedPath.indexOf('.', index + 1);
|
||||
}
|
||||
|
||||
ele.app_name = app;
|
||||
ele.protocol_stack_id = decodedPath;
|
||||
out.collect(JSON.toJSONString(ele));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static DataStream<String> testSource(StreamExecutionEnvironment environment){
|
||||
return environment.fromCollection(Arrays.asList(
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssh\",\"app\":\"ssh\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":720,\"out_bytes\":1200,\"in_pkts\":8,\"out_pkts\":16,\"c2s_pkts\":16,\"s2c_pkts\":8,\"c2s_bytes\":1200,\"s2c_bytes\":720,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"Gary-ApplicationTest\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":2536,\"out_bytes\":2237,\"in_pkts\":8,\"out_pkts\":7,\"c2s_pkts\":7,\"s2c_pkts\":8,\"c2s_bytes\":2237,\"s2c_bytes\":2536,\"c2s_tcp_retransmitted_pkts\":2,\"c2s_tcp_retransmitted_bytes\":120,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.http\",\"app\":\"ms_edge\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":326282,\"out_bytes\":8322,\"in_pkts\":241,\"out_pkts\":125,\"c2s_pkts\":125,\"s2c_pkts\":241,\"c2s_bytes\":8322,\"s2c_bytes\":326282,\"s2c_tcp_ooorder_pkts\":1,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP\",\"app\":\"port_443\",\"device_id\":\"9800165603191151\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"in_pkts\":1,\"s2c_pkts\":1,\"s2c_bytes\":66,\"timestamp_ms\":1719990000033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"port_443\",\"device_id\":\"9800165603191152\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990002033}",
|
||||
"{\"name\":\"traffic_application_protocol_stat\",\"decoded_path\":\"ETHERNET.IPv4.TCP.ssl\",\"app\":\"ssl.port_444\",\"device_id\":\"9800165603191153\",\"device_group\":\"group-xxg-tsgx\",\"data_center\":\"center-xxg-tsgx\",\"vsys_id\":1,\"in_bytes\":66,\"out_bytes\":60,\"in_pkts\":1,\"out_pkts\":1,\"c2s_pkts\":1,\"s2c_pkts\":1,\"c2s_bytes\":60,\"s2c_bytes\":66,\"c2s_tcp_retransmitted_pkts\":1,\"c2s_tcp_retransmitted_bytes\":60,\"timestamp_ms\":1719990003033}",
|
||||
"{}"
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@ import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.zdjizhi.common.pojo.Metrics;
|
||||
import com.zdjizhi.common.pojo.Tags;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
@@ -35,7 +35,7 @@ public class ResultFlatMap implements FlatMapFunction<Metrics, String> {
|
||||
String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
|
||||
int protocolIdsNum = protocolIds.length;
|
||||
for (int i = 0; i < protocolIdsNum - 1; i++) {
|
||||
if (StringUtil.isBlank(stringBuilder.toString())) {
|
||||
if (StringUtils.isBlank(stringBuilder.toString())) {
|
||||
stringBuilder.append(protocolIds[i]);
|
||||
tags.setProtocol_stack_id(stringBuilder.toString());
|
||||
metrics.setTags(tags);
|
||||
|
||||
@@ -7,7 +7,7 @@ import com.alibaba.fastjson2.JSONObject;
|
||||
import com.alibaba.fastjson2.JSONPath;
|
||||
import com.zdjizhi.common.pojo.Fields;
|
||||
import com.zdjizhi.common.pojo.Tags;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -32,7 +32,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
|
||||
@Override
|
||||
public void processElement(String value, ProcessFunction<String, Tuple3<Tags, Fields, Long>>.Context ctx, Collector<Tuple3<Tags, Fields, Long>> out) {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(value)) {
|
||||
if (StringUtils.isNotBlank(value)) {
|
||||
Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
|
||||
if (isProtocolData != null) {
|
||||
JSONObject originalLog = JSON.parseObject(value);
|
||||
@@ -44,7 +44,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
|
||||
Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
|
||||
Long timestamp_ms = originalLog.getLong("timestamp_ms");
|
||||
|
||||
if (StringUtil.isNotBlank(tags.getProtocol_stack_id())) {
|
||||
if (StringUtils.isNotBlank(tags.getProtocol_stack_id())) {
|
||||
joinProtocol(tags);
|
||||
|
||||
out.collect(new Tuple3<>(tags, fields, timestamp_ms));
|
||||
@@ -85,7 +85,7 @@ public class ParsingData extends ProcessFunction<String, Tuple3<Tags, Fields, Lo
|
||||
private static void joinProtocol(Tags tags) {
|
||||
String appFullPath = tags.getApp_name();
|
||||
|
||||
if (StringUtil.isNotBlank(appFullPath)) {
|
||||
if (StringUtils.isNotBlank(appFullPath)) {
|
||||
String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
|
||||
tags.setApp_name(appName);
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.zdjizhi;
|
||||
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
@@ -23,7 +23,7 @@ public class ConventionalTest {
|
||||
String appName = "qq_r2";
|
||||
String[] protocolIds = protocol.split("\\.");
|
||||
for (String proto : protocolIds) {
|
||||
if (StringUtil.isBlank(stringBuffer.toString())) {
|
||||
if (StringUtils.isBlank(stringBuffer.toString())) {
|
||||
stringBuffer.append(proto);
|
||||
System.out.println(stringBuffer.toString());
|
||||
} else {
|
||||
@@ -50,7 +50,7 @@ public class ConventionalTest {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (int i = 0; i < protocol.split(str).length - 1; i++) {
|
||||
String value = protocol.split(str)[i];
|
||||
if (StringUtil.isBlank(stringBuilder.toString())) {
|
||||
if (StringUtils.isBlank(stringBuilder.toString())) {
|
||||
stringBuilder.append(value);
|
||||
System.out.println(stringBuilder.toString());
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user