From f765650d9c7c2f6f1c9c5fa6a37ef74f6f896211 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 18 Sep 2023 15:09:51 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=97=B6=E9=97=B4=E6=88=B3?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E9=87=8D=E5=91=BD=E5=90=8Dtimestamp=E6=94=B9?= =?UTF-8?q?=E4=B8=BAtimestamp=5Fms=E3=80=82=EF=BC=88TSG-17084=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 23 +------- properties/default_config.properties | 2 +- properties/service_flow_config.properties | 18 +++--- .../java/com/zdjizhi/common/pojo/Metrics.java | 14 ++--- .../topology/ApplicationProtocolTopology.java | 4 +- .../utils/functions/process/ParsingData.java | 4 +- .../statistics/MergeCountWindow.java | 4 +- src/main/java/log4j.properties | 10 ++-- src/test/java/com/zdjizhi/ConfigTest.java | 56 +++++++++++++++++++ .../java/com/zdjizhi/conf/FusionConfigs.java | 34 +++++++++++ .../com/zdjizhi/conf/FusionConfiguration.java | 36 ++++++++++++ 11 files changed, 156 insertions(+), 49 deletions(-) create mode 100644 src/test/java/com/zdjizhi/ConfigTest.java create mode 100644 src/test/java/com/zdjizhi/conf/FusionConfigs.java create mode 100644 src/test/java/com/zdjizhi/conf/FusionConfiguration.java diff --git a/pom.xml b/pom.xml index e9c5e82..2568e90 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 230821 + 1.6 app-protocol-stat-traffic-merge http://www.example.com @@ -199,27 +199,6 @@ ${jasypt.version} - - - com.alibaba.nacos - nacos-client - ${nacos.version} - - - com.google.guava - guava - - - slf4j-log4j12 - org.slf4j - - - log4j-over-slf4j - org.slf4j - - - - junit junit diff --git a/properties/default_config.properties b/properties/default_config.properties index efe85b3..89d7987 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -30,7 +30,7 @@ buffer.memory=134217728 max.request.size=10485760 #生产者压缩模式 none or snappy -producer.kafka.compression.type=none +producer.kafka.compression.type=snappy #生产者ack producer.ack=1 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 98df5f4..e401d29 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,9 +1,9 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.85:9094 +source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.85:9094 +sink.kafka.servers=192.168.44.12:9094 #--------------------------------HTTP------------------------------# #kafka 证书地址 @@ -11,10 +11,10 @@ tools.library=D:\\workerspace\\dat #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=NETWORK-TRAFFIC-METRICS +source.kafka.topic=APP-PROTOCOL-TEST #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=APP-PROTOCOL-TEST-RESULT #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=livecharts-test-20230423-2 @@ -32,11 +32,11 @@ window.parallelism=1 #producer 并行度 sink.parallelism=1 -#初次随机预聚合窗口时间 -count.window.time=15 +#预聚合窗口时间 +count.window.time=5 + +#watermark延迟 +watermark.max.orderness=5 #数据源 firewall or agent metrics.data.source=firewall - -#watermark延迟 -watermark.max.orderness=60 diff --git a/src/main/java/com/zdjizhi/common/pojo/Metrics.java b/src/main/java/com/zdjizhi/common/pojo/Metrics.java index 58ecbfb..ae69597 100644 --- a/src/main/java/com/zdjizhi/common/pojo/Metrics.java +++ b/src/main/java/com/zdjizhi/common/pojo/Metrics.java @@ -10,14 +10,14 @@ public class Metrics { private String name; private Tags tags; private Fields fields; - private long timestamp; + private long timestamp_ms; - public Metrics(String name, Tags tags, Fields fields, long timestamp) { + public Metrics(String name, Tags tags, Fields fields, long timestamp_ms) { this.name = name; this.tags = tags; this.fields = fields; - this.timestamp = timestamp; + this.timestamp_ms = timestamp_ms; } public String getName() { @@ -44,11 +44,11 @@ public class Metrics { this.fields = fields; } - public long getTimestamp() { - return timestamp; + public long getTimestamp_ms() { + return timestamp_ms; } - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; + public void setTimestamp_ms(long timestamp_ms) { + this.timestamp_ms = timestamp_ms; } } diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java index bcb0f63..5f046c9 100644 --- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java +++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java @@ -15,6 +15,8 @@ import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -65,7 +67,7 @@ public class ApplicationProtocolTopology { resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); - environment.execute(args[0]); + environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE"); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :"); e.printStackTrace(); diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java index 020fe77..257915c 100644 --- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java +++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java @@ -26,7 +26,7 @@ public class ParsingData extends ProcessFunction(tags, fields, timestamp)); + out.collect(new Tuple3<>(tags, fields, timestamp_ms)); } } } catch (RuntimeException e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java index 52f08d2..2677855 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java @@ -24,11 +24,11 @@ public class MergeCountWindow extends ProcessWindowFunction> input, Collector output) throws Exception { try { - long timestamp = context.window().getStart(); + long timestamp_ms = context.window().getStart(); for (Tuple3 tuple : input) { Tags tags = tuple.f0; Fields fields = tuple.f1; - Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp); + Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms); output.collect(metrics); } diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties index 5a6b542..facffc7 100644 --- a/src/main/java/log4j.properties +++ b/src/main/java/log4j.properties @@ -1,14 +1,14 @@ #Log4j -log4j.rootLogger=error,console,file +log4j.rootLogger=info,console,file # 控制台日志设置 log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=error +log4j.appender.console.Threshold=info log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n # 文件日志设置 log4j.appender.file=org.apache.log4j.DailyRollingFileAppender -log4j.appender.file.Threshold=error +log4j.appender.file.Threshold=info log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true #路径请用相对路径,做好相关测试输出到应用目下 @@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout #log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n #MyBatis 配置,com.nis.web.dao是mybatis接口所在包 -log4j.logger.com.nis.web.dao=error +log4j.logger.com.nis.web.dao=info #bonecp数据源配置 -log4j.category.com.jolbox=error,console +log4j.category.com.jolbox=info,console diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java new file mode 100644 index 0000000..7b9580b --- /dev/null +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -0,0 +1,56 @@ +package com.zdjizhi; + +import com.zdjizhi.conf.FusionConfiguration; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.util.Collector; + +import static com.zdjizhi.conf.FusionConfigs.*; + +public class ConfigTest { + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + final ParameterTool tool; + try { + tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties"); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final FusionConfiguration fusionConfiguration = new FusionConfiguration(config); + + System.out.println(config.get(SOURCE_KAFKA_TOPIC)); + System.out.println(config.get(SINK_KAFKA_TOPIC)); + System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); + System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + + final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( + config.get(SOURCE_KAFKA_TOPIC), + new SimpleStringSchema(), + fusionConfiguration + .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); + + final DataStreamSource sourceStream = environment.addSource(kafkaConsumer); + + sourceStream.process(new ProcessFunction() { + + @Override + public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception { + + out.collect(value); + } + }).print(); + + environment.execute(); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/conf/FusionConfigs.java b/src/test/java/com/zdjizhi/conf/FusionConfigs.java new file mode 100644 index 0000000..ca18112 --- /dev/null +++ b/src/test/java/com/zdjizhi/conf/FusionConfigs.java @@ -0,0 +1,34 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class FusionConfigs { + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + /** + * Configuration option for the Kafka topic used in the source. + */ + public static final ConfigOption SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the source."); + + /** + * Configuration option for the Kafka topic used in the sink. + */ + public static final ConfigOption SINK_KAFKA_TOPIC = + ConfigOptions.key("sink.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the sink."); + +} diff --git a/src/test/java/com/zdjizhi/conf/FusionConfiguration.java b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java new file mode 100644 index 0000000..1e6dcf2 --- /dev/null +++ b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java @@ -0,0 +1,36 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + +public class FusionConfiguration { + private final Configuration config; + + public FusionConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } + +}