diff --git a/pom.xml b/pom.xml
index e9c5e82..2568e90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 230821
+ 1.6
app-protocol-stat-traffic-merge
http://www.example.com
@@ -199,27 +199,6 @@
${jasypt.version}
-
-
- com.alibaba.nacos
- nacos-client
- ${nacos.version}
-
-
- com.google.guava
- guava
-
-
- slf4j-log4j12
- org.slf4j
-
-
- log4j-over-slf4j
- org.slf4j
-
-
-
-
junit
junit
diff --git a/properties/default_config.properties b/properties/default_config.properties
index efe85b3..89d7987 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -30,7 +30,7 @@ buffer.memory=134217728
max.request.size=10485760
#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
+producer.kafka.compression.type=snappy
#生产者ack
producer.ack=1
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 98df5f4..e401d29 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,9 +1,9 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-source.kafka.servers=192.168.44.85:9094
+source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
-sink.kafka.servers=192.168.44.85:9094
+sink.kafka.servers=192.168.44.12:9094
#--------------------------------HTTP------------------------------#
#kafka 证书地址
@@ -11,10 +11,10 @@ tools.library=D:\\workerspace\\dat
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=NETWORK-TRAFFIC-METRICS
+source.kafka.topic=APP-PROTOCOL-TEST
#补全数据 输出 topic
-sink.kafka.topic=test-result
+sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=livecharts-test-20230423-2
@@ -32,11 +32,11 @@ window.parallelism=1
#producer 并行度
sink.parallelism=1
-#初次随机预聚合窗口时间
-count.window.time=15
+#预聚合窗口时间
+count.window.time=5
+
+#watermark延迟
+watermark.max.orderness=5
#数据源 firewall or agent
metrics.data.source=firewall
-
-#watermark延迟
-watermark.max.orderness=60
diff --git a/src/main/java/com/zdjizhi/common/pojo/Metrics.java b/src/main/java/com/zdjizhi/common/pojo/Metrics.java
index 58ecbfb..ae69597 100644
--- a/src/main/java/com/zdjizhi/common/pojo/Metrics.java
+++ b/src/main/java/com/zdjizhi/common/pojo/Metrics.java
@@ -10,14 +10,14 @@ public class Metrics {
private String name;
private Tags tags;
private Fields fields;
- private long timestamp;
+ private long timestamp_ms;
- public Metrics(String name, Tags tags, Fields fields, long timestamp) {
+ public Metrics(String name, Tags tags, Fields fields, long timestamp_ms) {
this.name = name;
this.tags = tags;
this.fields = fields;
- this.timestamp = timestamp;
+ this.timestamp_ms = timestamp_ms;
}
public String getName() {
@@ -44,11 +44,11 @@ public class Metrics {
this.fields = fields;
}
- public long getTimestamp() {
- return timestamp;
+ public long getTimestamp_ms() {
+ return timestamp_ms;
}
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
}
}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index bcb0f63..5f046c9 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -15,6 +15,8 @@ import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -65,7 +67,7 @@ public class ApplicationProtocolTopology {
resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
.setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
- environment.execute(args[0]);
+ environment.execute("APP-PROTOCOL-STAT-TRAFFIC-MERGE");
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :");
e.printStackTrace();
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
index 020fe77..257915c 100644
--- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -26,7 +26,7 @@ public class ParsingData extends ProcessFunction(tags, fields, timestamp));
+ out.collect(new Tuple3<>(tags, fields, timestamp_ms));
}
}
} catch (RuntimeException e) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
index 52f08d2..2677855 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
@@ -24,11 +24,11 @@ public class MergeCountWindow extends ProcessWindowFunction> input, Collector output) throws Exception {
try {
- long timestamp = context.window().getStart();
+ long timestamp_ms = context.window().getStart();
for (Tuple3 tuple : input) {
Tags tags = tuple.f0;
Fields fields = tuple.f1;
- Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp);
+ Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, timestamp_ms);
output.collect(metrics);
}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
index 5a6b542..facffc7 100644
--- a/src/main/java/log4j.properties
+++ b/src/main/java/log4j.properties
@@ -1,14 +1,14 @@
#Log4j
-log4j.rootLogger=error,console,file
+log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=error
+log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.file.Threshold=error
+log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
@@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
-log4j.logger.com.nis.web.dao=error
+log4j.logger.com.nis.web.dao=info
#bonecp数据源配置
-log4j.category.com.jolbox=error,console
+log4j.category.com.jolbox=info,console
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
new file mode 100644
index 0000000..7b9580b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -0,0 +1,56 @@
+package com.zdjizhi;
+
+import com.zdjizhi.conf.FusionConfiguration;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.util.Collector;
+
+import static com.zdjizhi.conf.FusionConfigs.*;
+
+public class ConfigTest {
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final ParameterTool tool;
+ try {
+ tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
+
+ System.out.println(config.get(SOURCE_KAFKA_TOPIC));
+ System.out.println(config.get(SINK_KAFKA_TOPIC));
+ System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+ System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+
+ final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
+ config.get(SOURCE_KAFKA_TOPIC),
+ new SimpleStringSchema(),
+ fusionConfiguration
+ .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+
+ final DataStreamSource sourceStream = environment.addSource(kafkaConsumer);
+
+ sourceStream.process(new ProcessFunction() {
+
+ @Override
+ public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
+
+ out.collect(value);
+ }
+ }).print();
+
+ environment.execute();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfigs.java b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
new file mode 100644
index 0000000..ca18112
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FusionConfigs {
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+ /**
+ * Configuration option for the Kafka topic used in the source.
+ */
+ public static final ConfigOption SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+ /**
+ * Configuration option for the Kafka topic used in the sink.
+ */
+ public static final ConfigOption SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfiguration.java b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
new file mode 100644
index 0000000..1e6dcf2
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+public class FusionConfiguration {
+ private final Configuration config;
+
+ public FusionConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+
+}