协议与应用统计程序基于事件时间处理,且结果数据时间戳为毫秒级。(TSG-16737)

This commit is contained in:
qidaijie
2023-08-21 17:22:37 +08:00
parent 345b7fd601
commit 7b2302234a
8 changed files with 55 additions and 34 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId> <artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>230710-Time</version> <version>230821</version>
<name>app-protocol-stat-traffic-merge</name> <name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url> <url>http://www.example.com</url>

View File

@@ -1,24 +1,23 @@
#--------------------------------地址配置------------------------------# #--------------------------------地址配置------------------------------#
#管理kafka地址 #管理kafka地址
source.kafka.servers=192.168.44.12:9094 source.kafka.servers=192.168.44.85:9094
#管理输出kafka地址 #管理输出kafka地址
sink.kafka.servers=192.168.44.12:9094 sink.kafka.servers=192.168.44.85:9094
#--------------------------------HTTP------------------------------# #--------------------------------HTTP------------------------------#
#kafka 证书地址 #kafka 证书地址
tools.library=D:\\workerspace\\dat tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------#
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=etl-test source.kafka.topic=NETWORK-TRAFFIC-METRICS
#补全数据 输出 topic #补全数据 输出 topic
sink.kafka.topic=etl-test-result sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据 #读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=livecharts-test-20230423-1 group.id=livecharts-test-20230423-2
#--------------------------------topology配置------------------------------# #--------------------------------topology配置------------------------------#
#consumer 并行度 #consumer 并行度
@@ -39,3 +38,5 @@ count.window.time=15
#数据源 firewall or agent #数据源 firewall or agent
metrics.data.source=firewall metrics.data.source=firewall
#watermark延迟
watermark.max.orderness=60

View File

@@ -27,6 +27,7 @@ public class GlobalConfig {
public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name"); public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism"); public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism"); public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness");
public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");

View File

@@ -6,21 +6,22 @@ import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.functions.filter.DataTypeFilter;
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
import com.zdjizhi.utils.functions.map.MetricsParseMap;
import com.zdjizhi.utils.functions.map.ResultFlatMap; import com.zdjizhi.utils.functions.map.ResultFlatMap;
import com.zdjizhi.utils.functions.process.ParsingData;
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
import com.zdjizhi.utils.functions.statistics.MergeCountWindow; import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer; import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/** /**
* @author qidaijie * @author qidaijie
@@ -35,33 +36,39 @@ public class ApplicationProtocolTopology {
try { try {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//解析原始日志 WatermarkStrategy<Tuple3<Tags, Fields, Long>> strategyForSession = WatermarkStrategy
.<Tuple3<Tags, Fields, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS))
.withTimestampAssigner((element,timestamp) -> element.f2);
//数据源
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
.setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
SingleOutputStreamOperator<String> appProtocolFilter = streamSource.filter(new DataTypeFilter()) //解析数据
.name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM); SingleOutputStreamOperator<Tuple3<Tags, Fields, Long>> parseDataProcess = streamSource.process(new ParsingData())
.assignTimestampsAndWatermarks(strategyForSession)
.name("ParseDataProcess")
.setParallelism(GlobalConfig.PARSE_PARALLELISM);
//增量聚合窗口
SingleOutputStreamOperator<Tuple2<Tags, Fields>> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
.name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM); .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
SingleOutputStreamOperator<Metrics> dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy())
.window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
.reduce(new DispersionCountWindow(), new MergeCountWindow()) .reduce(new DispersionCountWindow(), new MergeCountWindow())
.name("DispersionCountWindow") .name("DispersionCountWindow")
.setParallelism(GlobalConfig.WINDOW_PARALLELISM); .setParallelism(GlobalConfig.WINDOW_PARALLELISM);
//拆分数据
SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) SingleOutputStreamOperator<String> resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
.name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
//输出
resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
environment.execute(args[0]); environment.execute(args[0]);
} catch (Exception e) { } catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e); logger.error("This Flink task start ERROR! Exception information is :");
e.printStackTrace();
} }
} }

View File

@@ -4,6 +4,9 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.sql.Timestamp;
/** /**
* @author qidaijie * @author qidaijie
@@ -11,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description: * @Description:
* @date 2021/7/2112:13 * @date 2021/7/2112:13
*/ */
public class DimensionKeyBy implements KeySelector<Tuple2<Tags, Fields>, String> { public class DimensionKeyBy implements KeySelector<Tuple3<Tags, Fields, Long>, String> {
@Override @Override
public String getKey(Tuple2<Tags, Fields> value) throws Exception { public String getKey(Tuple3<Tags, Fields, Long> value) throws Exception {
//以map拼接的key分组 //以map拼接的key分组
return value.f0.toString(); return value.f0.toString();
} }

View File

@@ -3,10 +3,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.general.MetricUtil; import com.zdjizhi.utils.general.MetricUtil;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
/** /**
* @author qidaijie * @author qidaijie
@@ -14,21 +16,23 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description: * @Description:
* @date 2023/4/2314:02 * @date 2023/4/2314:02
*/ */
public class DispersionCountWindow implements ReduceFunction<Tuple2<Tags, Fields>> { public class DispersionCountWindow implements ReduceFunction<Tuple3<Tags, Fields, Long>> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
@Override @Override
public Tuple2<Tags, Fields> reduce(Tuple2<Tags, Fields> value1, Tuple2<Tags, Fields> value2) throws Exception { public Tuple3<Tags, Fields, Long> reduce(Tuple3<Tags, Fields, Long> value1, Tuple3<Tags, Fields, Long> value2) throws Exception {
try { try {
Fields cacheData = value1.f1; Fields cacheData = value1.f1;
Fields newData = value2.f1; Fields newData = value2.f1;
Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData); Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData);
return new Tuple2<>(value1.f0, metricsResult); return new Tuple3<>(value1.f0, metricsResult, value1.f2);
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage()); logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage());
return value1; return value1;
} }
} }
} }

View File

@@ -7,6 +7,7 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
@@ -17,19 +18,20 @@ import org.apache.flink.util.Collector;
* @Description: * @Description:
* @date 2023/4/2314:43 * @date 2023/4/2314:43
*/ */
public class MergeCountWindow extends ProcessWindowFunction<Tuple2<Tags, Fields>, Metrics, String, TimeWindow> { public class MergeCountWindow extends ProcessWindowFunction<Tuple3<Tags, Fields,Long>, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
@Override @Override
public void process(String windowKey, Context context, Iterable<Tuple2<Tags, Fields>> input, Collector<Metrics> output) throws Exception { public void process(String windowKey, Context context, Iterable<Tuple3<Tags, Fields,Long>> input, Collector<Metrics> output) throws Exception {
try { try {
Long endTime = context.window().getStart() / 1000; long timestamp = context.window().getStart();
for (Tuple2<Tags, Fields> tuple : input) { for (Tuple3<Tags, Fields,Long> tuple : input) {
Tags tags = tuple.f0; Tags tags = tuple.f0;
Fields fields = tuple.f1; Fields fields = tuple.f1;
Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime); Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp);
output.collect(metrics); output.collect(metrics);
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage()); logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage());
} }

View File

@@ -43,9 +43,12 @@ public class FlagsTest {
common_flags = 16400L; common_flags = 16400L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
common_flags = 1062135466L;
System.out.println("common_flags & clientIsLocal = " + (common_flags & 128));
System.out.println("common_flags & serverIsLocal = " + (common_flags & 256)+"\n\n");
if ((0L & clientIsLocal) == 0L){ if ((0L & clientIsLocal) == 0L){
System.out.println("yes"); System.out.println("yes");