diff --git a/pom.xml b/pom.xml index b8e161b..e9c5e82 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 230710-Time + 230821 app-protocol-stat-traffic-merge http://www.example.com diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index c979817..98df5f4 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,24 +1,23 @@ #--------------------------------地址配置------------------------------# - #管理kafka地址 -source.kafka.servers=192.168.44.12:9094 +source.kafka.servers=192.168.44.85:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 +sink.kafka.servers=192.168.44.85:9094 #--------------------------------HTTP------------------------------# #kafka 证书地址 tools.library=D:\\workerspace\\dat -#--------------------------------Kafka消费组信息------------------------------# +#--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=etl-test +source.kafka.topic=NETWORK-TRAFFIC-METRICS #补全数据 输出 topic -sink.kafka.topic=etl-test-result +sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=livecharts-test-20230423-1 +group.id=livecharts-test-20230423-2 #--------------------------------topology配置------------------------------# #consumer 并行度 @@ -39,3 +38,5 @@ count.window.time=15 #数据源 firewall or agent metrics.data.source=firewall +#watermark延迟 +watermark.max.orderness=60 diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java index 84008df..24702a5 100644 --- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java +++ b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java @@ -27,6 +27,7 @@ public class GlobalConfig { public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name"); public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism"); public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism"); + public static final Integer WARTERMARK_MAX_ORDERNESS = GlobalConfigLoad.getIntProperty(0, "watermark.max.orderness"); public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time"); public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library"); public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism"); diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index 0f34770..bcb0f63 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -6,21 +6,22 @@ import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; -import com.zdjizhi.utils.functions.filter.DataTypeFilter; import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; -import com.zdjizhi.utils.functions.map.MetricsParseMap; import com.zdjizhi.utils.functions.map.ResultFlatMap; +import com.zdjizhi.utils.functions.process.ParsingData; import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; import com.zdjizhi.utils.functions.statistics.MergeCountWindow; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.eventtime.*; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import java.time.Duration; /** * @author qidaijie @@ -35,33 +36,39 @@ public class ApplicationProtocolTopology { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - //解析原始日志 + WatermarkStrategy> strategyForSession = WatermarkStrategy + .>forBoundedOutOfOrderness(Duration.ofSeconds(GlobalConfig.WARTERMARK_MAX_ORDERNESS)) + .withTimestampAssigner((element,timestamp) -> element.f2); + + //数据源 DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); - SingleOutputStreamOperator appProtocolFilter = streamSource.filter(new DataTypeFilter()) - .name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM); + //解析数据 + SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData()) + .assignTimestampsAndWatermarks(strategyForSession) + .name("ParseDataProcess") + .setParallelism(GlobalConfig.PARSE_PARALLELISM); - - SingleOutputStreamOperator> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) - .name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM); - - SingleOutputStreamOperator dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) + //增量聚合窗口 + SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy()) + .window(TumblingEventTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) .reduce(new DispersionCountWindow(), new MergeCountWindow()) .name("DispersionCountWindow") .setParallelism(GlobalConfig.WINDOW_PARALLELISM); + //拆分数据 SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); - + //输出 resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); environment.execute(args[0]); } catch (Exception e) { - logger.error("This Flink task start ERROR! Exception information is :" + e); + logger.error("This Flink task start ERROR! Exception information is :"); + e.printStackTrace(); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java index 4393729..eed832f 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java @@ -4,6 +4,9 @@ import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Tags; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.sql.Timestamp; /** * @author qidaijie @@ -11,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2; * @Description: * @date 2021/7/2112:13 */ -public class DimensionKeyBy implements KeySelector, String> { +public class DimensionKeyBy implements KeySelector, String> { @Override - public String getKey(Tuple2 value) throws Exception { + public String getKey(Tuple3 value) throws Exception { //以map拼接的key分组 return value.f0.toString(); } diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java index 57ebde1..8216320 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java @@ -3,10 +3,12 @@ package com.zdjizhi.utils.functions.statistics; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.pojo.Fields; +import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; import com.zdjizhi.utils.general.MetricUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; /** * @author qidaijie @@ -14,21 +16,23 @@ import org.apache.flink.api.java.tuple.Tuple2; * @Description: * @date 2023/4/2314:02 */ -public class DispersionCountWindow implements ReduceFunction> { +public class DispersionCountWindow implements ReduceFunction> { private static final Log logger = LogFactory.get(); @Override - public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception { try { Fields cacheData = value1.f1; Fields newData = value2.f1; Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData); - return new Tuple2<>(value1.f0, metricsResult); + return new Tuple3<>(value1.f0, metricsResult, value1.f2); } catch (RuntimeException e) { logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage()); return value1; } } + + } diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index e3179a7..52f08d2 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -7,6 +7,7 @@ import com.zdjizhi.common.pojo.Fields; import com.zdjizhi.common.pojo.Metrics; import com.zdjizhi.common.pojo.Tags; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -17,19 +18,20 @@ import org.apache.flink.util.Collector; * @Description: * @date 2023/4/2314:43 */ -public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> { +public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> { private static final Log logger = LogFactory.get(); @Override - public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { + public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception { try { - Long endTime = context.window().getStart() / 1000; - for (Tuple2 tuple : input) { + long timestamp = context.window().getStart(); + for (Tuple3 tuple : input) { Tags tags = tuple.f0; Fields fields = tuple.f1; - Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime); + Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp); output.collect(metrics); } + } catch (RuntimeException e) { logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage()); } diff --git a/src/test/java/com/zdjizhi/FlagsTest.java b/src/test/java/com/zdjizhi/FlagsTest.java index 15ee4aa..ce59840 100644 --- a/src/test/java/com/zdjizhi/FlagsTest.java +++ b/src/test/java/com/zdjizhi/FlagsTest.java @@ -43,9 +43,12 @@ public class FlagsTest { common_flags = 16400L; System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); - System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); + System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); + common_flags = 1062135466L; + System.out.println("common_flags & clientIsLocal = " + (common_flags & 128)); + System.out.println("common_flags & serverIsLocal = " + (common_flags & 256)+"\n\n"); if ((0L & clientIsLocal) == 0L){ System.out.println("yes");