diff --git a/README.md b/README.md
index b888ea8..2705dfc 100644
--- a/README.md
+++ b/README.md
@@ -1,22 +1,65 @@
# app-protocol-stat-traffic-merge
-Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,周期15秒。
+Live Traffic Chart统计程序,基于协议栈拆分多流聚合,存储到协议与应用统计表中,使用增量窗口计算,统计周期5秒,watermark5秒。
## 数据源
-以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS
-### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。
-### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。
-## 统计操作
-### 1.过滤name是traffic_application_protocol_stat的数据。
+以下不论基于哪种计算,Topic均为NETWORK-TRAFFIC-METRICS
+
+### 1.app-protocol-stat-traffic-agent程序基于已关闭/过渡会话日志统计Application and Protocol Metrics,聚合粒度为1秒。(TSG 23.05版本)
+
+### 2.功能端进行统计产生的Application and Protocol Metrics数据,聚合粒度为1秒。(≥TSG 23.05版本)
+
+## 操作
+
+### 1.过滤Measurement Name是traffic_application_protocol_stat的数据。
+
### 2.基于Tags内容进行分组统计。
+
### 3.拆分protocol_stack_id协议树为多个节点
+
#### 例如,ETHERNET.IPv4.TCP.https.kingsoft.wps_office,每个节点ID为:
+
##### ETHERNET
+
##### ETHERNET.IPv4
+
##### ETHERNET.IPv4.TCP
+
##### ETHERNET.IPv4.TCP.https
+
##### ETHERNET.IPv4.TCP.https.kingsoft
+
##### ETHERNET.IPv4.TCP.https.kingsoft.wps_office
+
### 4.app_name仅在终端节点输出。
+
### 5.输出结果时Measurement Name=application_protocol_stat。
+
+
+
+## 启动
+
+Standalone:
+
+`flink run [-p parallelism] -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
+
+Yarn:
+
+`flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name=APP-PROTOCOL-STAT-TRAFFIC-MERGE -Dtaskmanager.numberOfTaskSlots=1 -d -p 3 -c com.zdjizhi.topology.ApplicationProtocolTopology app-protocol-stat-traffic-merge-[version].jar app.properties`
+
+
+
+## 配置项说明
+
+|配置项|类型|必填|默认值|含义|
+|--|--|--|--|--|
+|source.kafka.topic|STRING|Y||数据源的Kafka Topic 名称|
+|source.kafka.props.*|STRING|N||数据源的Kafka 消费者连接相关参数|
+|startup.mode|STRING|N|group|数据源消费策略(group:从当前消费组的偏移量开始,latest:从分区最新的偏移量开始,earliest:从分区最早的偏移量开始)|
+|sink.kafka.topic|STRING|Y||数据输出的Kafka Topic 名称|
+|sink.kafka.props.*|STRING|N||数据输出的Kafka 生产者连接相关参数|
+|count.window.time|INT|N|5|聚合窗口大小(单位:秒)|
+|watermark.max.orderness|INT|N|5|乱序数据的最大延迟时间(单位:秒)|
+|log.failures.only|BOOLEAN|N|false|true:生产者出现错误时任务失败,false:只记录错误信息|
+|measurement.name|STRING|N|application_protocol_stat|数据输出时的指标标识名称|
diff --git a/pom.xml b/pom.xml
index b8e161b..e76331f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 230710-Time
+ 2.0.1
app-protocol-stat-traffic-merge
http://www.example.com
@@ -199,27 +199,6 @@
${jasypt.version}
-
-
- com.alibaba.nacos
- nacos-client
- ${nacos.version}
-
-
- com.google.guava
- guava
-
-
- slf4j-log4j12
- org.slf4j
-
-
- log4j-over-slf4j
- org.slf4j
-
-
-
-
junit
junit
@@ -227,12 +206,6 @@
test
-
- org.apache.datasketches
- datasketches-java
- ${datasketches.version}
-
-
com.alibaba
fastjson
diff --git a/properties/application.properties b/properties/application.properties
new file mode 100644
index 0000000..0f66f4e
--- /dev/null
+++ b/properties/application.properties
@@ -0,0 +1,27 @@
+#kafka 接收数据topic
+source.kafka.topic=NETWORK-TRAFFIC-METRIC-TEST
+
+source.kafka.props.bootstrap.servers=192.168.44.12:9094
+
+source.kafka.props.group.id=appapp-protocol-merge-231109-1
+
+source.kafka.props.security.protocol=SASL_PLAINTEXT
+
+source.kafka.props.sasl.mechanism=PLAIN
+
+source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+
+#补全数据 输出 topic
+sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
+
+sink.kafka.props.bootstrap.servers=192.168.44.12:9094
+
+sink.kafka.props.security.protocol=SASL_PLAINTEXT
+
+sink.kafka.props.sasl.mechanism=PLAIN
+
+sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+
+count.window.time=5
+
+watermark.max.orderness=5
\ No newline at end of file
diff --git a/properties/default_config.properties b/properties/default_config.properties
deleted file mode 100644
index efe85b3..0000000
--- a/properties/default_config.properties
+++ /dev/null
@@ -1,46 +0,0 @@
-#====================Kafka KafkaConsumer====================#
-#kafka source connection timeout
-session.timeout.ms=60000
-
-#kafka source poll
-max.poll.records=5000
-
-#kafka source poll bytes
-max.partition.fetch.bytes=31457280
-
-#====================Kafka KafkaProducer====================#
-#producer重试的次数设置
-retries=0
-
-#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
-linger.ms=10
-
-#如果在超时之前未收到响应,客户端将在必要时重新发送请求
-request.timeout.ms=30000
-
-#producer都是按照batch进行发送的,批次大小,默认:16384
-batch.size=262144
-
-#Producer端用于缓存消息的缓冲区大小
-#128M
-buffer.memory=134217728
-
-#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-#10M
-max.request.size=10485760
-
-#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
-
-#生产者ack
-producer.ack=1
-
-#====================kafka default====================#
-#kafka SASL验证用户名-加密
-kafka.user=nsyGpHKGFA4KW0zro9MDdw==
-
-#kafka SASL及SSL验证密码-加密
-kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
-
-#====================Topology Default====================#
-measurement.name=application_protocol_stat
\ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
deleted file mode 100644
index c979817..0000000
--- a/properties/service_flow_config.properties
+++ /dev/null
@@ -1,41 +0,0 @@
-#--------------------------------地址配置------------------------------#
-
-#管理kafka地址
-source.kafka.servers=192.168.44.12:9094
-
-#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
-
-#--------------------------------HTTP------------------------------#
-#kafka 证书地址
-tools.library=D:\\workerspace\\dat
-#--------------------------------Kafka消费组信息------------------------------#
-
-#kafka 接收数据topic
-source.kafka.topic=etl-test
-
-#补全数据 输出 topic
-sink.kafka.topic=etl-test-result
-
-#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=livecharts-test-20230423-1
-
-#--------------------------------topology配置------------------------------#
-#consumer 并行度
-source.parallelism=1
-
-#map函数并行度
-parse.parallelism=1
-
-#第一次窗口计算并行度
-window.parallelism=1
-
-#producer 并行度
-sink.parallelism=1
-
-#初次随机预聚合窗口时间
-count.window.time=15
-
-#数据源 firewall or agent
-metrics.data.source=firewall
-
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java b/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
deleted file mode 100644
index 84008df..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfig.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package com.zdjizhi.common.config;
-
-
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-
-/**
- * @author Administrator
- */
-public class GlobalConfig {
-
- private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
-
- static {
- encryptor.setPassword("galaxy");
- }
-
- /**
- * 协议分隔符,需要转义
- */
- public static final String PROTOCOL_SPLITTER = "\\.";
-
-
- /**
- * System
- */
- public static final Integer SOURCE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "source.parallelism");
- public static final String MEASUREMENT_NAME = GlobalConfigLoad.getStringProperty(1, "measurement.name");
- public static final Integer PARSE_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "parse.parallelism");
- public static final Integer WINDOW_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "window.parallelism");
- public static final Integer COUNT_WINDOW_TIME = GlobalConfigLoad.getIntProperty(0, "count.window.time");
- public static final String TOOLS_LIBRARY = GlobalConfigLoad.getStringProperty(0, "tools.library");
- public static final Integer SINK_PARALLELISM = GlobalConfigLoad.getIntProperty(0, "sink.parallelism");
- public static final String METICS_DATA_SOURCE = GlobalConfigLoad.getStringProperty(0, "metrics.data.source");
-
- /**
- * Kafka common
- */
- public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.user"));
- public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(GlobalConfigLoad.getStringProperty(1, "kafka.pin"));
-
-
- /**
- * kafka sink config
- */
- public static final String SINK_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "sink.kafka.servers");
- public static final String SINK_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "sink.kafka.topic");
- public static final String PRODUCER_ACK = GlobalConfigLoad.getStringProperty(1, "producer.ack");
- public static final String RETRIES = GlobalConfigLoad.getStringProperty(1, "retries");
- public static final String LINGER_MS = GlobalConfigLoad.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = GlobalConfigLoad.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = GlobalConfigLoad.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = GlobalConfigLoad.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = GlobalConfigLoad.getIntProperty(1, "max.request.size");
-
-
- /**
- * kafka source config
- */
- public static final String SOURCE_KAFKA_SERVERS = GlobalConfigLoad.getStringProperty(0, "source.kafka.servers");
- public static final String SOURCE_KAFKA_TOPIC = GlobalConfigLoad.getStringProperty(0, "source.kafka.topic");
- public static final String GROUP_ID = GlobalConfigLoad.getStringProperty(0, "group.id");
- public static final String SESSION_TIMEOUT_MS = GlobalConfigLoad.getStringProperty(1, "session.timeout.ms");
- public static final String MAX_POLL_RECORDS = GlobalConfigLoad.getStringProperty(1, "max.poll.records");
- public static final String MAX_PARTITION_FETCH_BYTES = GlobalConfigLoad.getStringProperty(1, "max.partition.fetch.bytes");
-
-
- /**
- * kafka限流配置-20201117
- */
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = GlobalConfigLoad.getStringProperty(1, "producer.kafka.compression.type");
-
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java b/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
deleted file mode 100644
index 0ae91e5..0000000
--- a/src/main/java/com/zdjizhi/common/config/GlobalConfigLoad.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.zdjizhi.common.config;
-
-import com.zdjizhi.utils.StringUtil;
-
-import java.io.IOException;
-import java.util.Locale;
-import java.util.Properties;
-
-
-/**
- * @author Administrator
- */
-
-public final class GlobalConfigLoad {
-
- private static Properties propDefault = new Properties();
- private static Properties propService = new Properties();
-
-
- static String getStringProperty(Integer type, String key) {
- if (type == 0) {
- return propService.getProperty(key);
- } else if (type == 1) {
- return propDefault.getProperty(key);
- } else {
- return null;
- }
-
- }
-
- static Integer getIntProperty(Integer type, String key) {
- if (type == 0) {
- return Integer.parseInt(propService.getProperty(key));
- } else if (type == 1) {
- return Integer.parseInt(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Long getLongProperty(Integer type, String key) {
- if (type == 0) {
- return Long.parseLong(propService.getProperty(key));
- } else if (type == 1) {
- return Long.parseLong(propDefault.getProperty(key));
- } else {
- return null;
- }
- }
-
- public static Boolean getBooleanProperty(Integer type, String key) {
- if (type == 0) {
- return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else if (type == 1) {
- return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else {
- return null;
- }
- }
-
- static {
- try {
- propService.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propDefault.load(GlobalConfigLoad.class.getClassLoader().getResourceAsStream("default_config.properties"));
- } catch (IOException | RuntimeException e) {
- propDefault = null;
- propService = null;
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfigs.java b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
new file mode 100644
index 0000000..8cf604a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfigs.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Containing configuration options for the Fusion application.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+public class MergeConfigs {
+
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+
+
+ public static final ConfigOption SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+
+ public static final ConfigOption SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+
+ public static final ConfigOption COUNT_WINDOW_TIME =
+ ConfigOptions.key("count.window.time")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate window time");
+
+
+ public static final ConfigOption WARTERMARK_MAX_ORDERNESS =
+ ConfigOptions.key("watermark.max.orderness")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The aggregate watermark max time");
+
+
+ public static final ConfigOption STARTUP_MODE =
+ ConfigOptions.key("startup.mode")
+ .stringType()
+ .defaultValue("group")
+ .withDescription("The offset commit mode for the consumer.");
+
+
+ public static final ConfigOption LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Defines whether the producer should fail on errors, or only log them.");
+
+
+ public static final ConfigOption MEASUREMENT_NAME =
+ ConfigOptions.key("measurement.name")
+ .stringType()
+ .defaultValue("application_protocol_stat")
+ .withDescription("The data identification.");
+
+
+ public static final ConfigOption JOB_NAME =
+ ConfigOptions.key("job.name")
+ .stringType()
+ .defaultValue("agg_app_protocol_traffic")
+ .withDescription("The flink job name.");
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
new file mode 100644
index 0000000..4da7616
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/config/MergeConfiguration.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.common.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+/**
+ * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
+ * properties with a specific prefix. This class allows retrieving properties that start with the
+ * given `prefix` and converts them into a `java.util.Properties` object.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+
+public class MergeConfiguration {
+ private final Configuration config;
+
+ public MergeConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/pojo/Fields.java b/src/main/java/com/zdjizhi/common/pojo/Fields.java
index baa5b25..0e32a56 100644
--- a/src/main/java/com/zdjizhi/common/pojo/Fields.java
+++ b/src/main/java/com/zdjizhi/common/pojo/Fields.java
@@ -7,28 +7,27 @@ package com.zdjizhi.common.pojo;
* @date 2023/4/2311:47
*/
public class Fields {
- private Long sessions;
- private Long in_bytes;
- private Long out_bytes;
- private Long in_pkts;
- private Long out_pkts;
- private Long c2s_pkts;
- private Long s2c_pkts;
- private Long c2s_bytes;
- private Long s2c_bytes;
- private Long c2s_fragments;
- private Long s2c_fragments;
- private Long c2s_tcp_lost_bytes;
- private Long s2c_tcp_lost_bytes;
- private Long c2s_tcp_ooorder_pkts;
- private Long s2c_tcp_ooorder_pkts;
- private Long c2s_tcp_retransmitted_pkts;
- private Long s2c_tcp_retransmitted_pkts;
- private Long c2s_tcp_retransmitted_bytes;
- private Long s2c_tcp_retransmitted_bytes;
- private String client_ip_sketch;
+ private long sessions;
+ private long in_bytes;
+ private long out_bytes;
+ private long in_pkts;
+ private long out_pkts;
+ private long c2s_pkts;
+ private long s2c_pkts;
+ private long c2s_bytes;
+ private long s2c_bytes;
+ private long c2s_fragments;
+ private long s2c_fragments;
+ private long c2s_tcp_lost_bytes;
+ private long s2c_tcp_lost_bytes;
+ private long c2s_tcp_ooorder_pkts;
+ private long s2c_tcp_ooorder_pkts;
+ private long c2s_tcp_retransmitted_pkts;
+ private long s2c_tcp_retransmitted_pkts;
+ private long c2s_tcp_retransmitted_bytes;
+ private long s2c_tcp_retransmitted_bytes;
- public Fields(Long sessions, Long in_bytes, Long out_bytes, Long in_pkts, Long out_pkts, Long c2s_pkts, Long s2c_pkts, Long c2s_bytes, Long s2c_bytes, Long c2s_fragments, Long s2c_fragments, Long c2s_tcp_lost_bytes, Long s2c_tcp_lost_bytes, Long c2s_tcp_ooorder_pkts, Long s2c_tcp_ooorder_pkts, Long c2s_tcp_retransmitted_pkts, Long s2c_tcp_retransmitted_pkts, Long c2s_tcp_retransmitted_bytes, Long s2c_tcp_retransmitted_bytes, String client_ip_sketch) {
+ public Fields(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes) {
this.sessions = sessions;
this.in_bytes = in_bytes;
this.out_bytes = out_bytes;
@@ -48,166 +47,158 @@ public class Fields {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
- this.client_ip_sketch = client_ip_sketch;
}
- public Long getSessions() {
+ public long getSessions() {
return sessions;
}
- public void setSessions(Long sessions) {
+ public void setSessions(long sessions) {
this.sessions = sessions;
}
- public Long getIn_bytes() {
+ public long getIn_bytes() {
return in_bytes;
}
- public void setIn_bytes(Long in_bytes) {
+ public void setIn_bytes(long in_bytes) {
this.in_bytes = in_bytes;
}
- public Long getOut_bytes() {
+ public long getOut_bytes() {
return out_bytes;
}
- public void setOut_bytes(Long out_bytes) {
+ public void setOut_bytes(long out_bytes) {
this.out_bytes = out_bytes;
}
- public Long getIn_pkts() {
+ public long getIn_pkts() {
return in_pkts;
}
- public void setIn_pkts(Long in_pkts) {
+ public void setIn_pkts(long in_pkts) {
this.in_pkts = in_pkts;
}
- public Long getOut_pkts() {
+ public long getOut_pkts() {
return out_pkts;
}
- public void setOut_pkts(Long out_pkts) {
+ public void setOut_pkts(long out_pkts) {
this.out_pkts = out_pkts;
}
- public Long getC2s_pkts() {
+ public long getC2s_pkts() {
return c2s_pkts;
}
- public void setC2s_pkts(Long c2s_pkts) {
+ public void setC2s_pkts(long c2s_pkts) {
this.c2s_pkts = c2s_pkts;
}
- public Long getS2c_pkts() {
+ public long getS2c_pkts() {
return s2c_pkts;
}
- public void setS2c_pkts(Long s2c_pkts) {
+ public void setS2c_pkts(long s2c_pkts) {
this.s2c_pkts = s2c_pkts;
}
- public Long getC2s_bytes() {
+ public long getC2s_bytes() {
return c2s_bytes;
}
- public void setC2s_bytes(Long c2s_bytes) {
+ public void setC2s_bytes(long c2s_bytes) {
this.c2s_bytes = c2s_bytes;
}
- public Long getS2c_bytes() {
+ public long getS2c_bytes() {
return s2c_bytes;
}
- public void setS2c_bytes(Long s2c_bytes) {
+ public void setS2c_bytes(long s2c_bytes) {
this.s2c_bytes = s2c_bytes;
}
- public Long getC2s_fragments() {
+ public long getC2s_fragments() {
return c2s_fragments;
}
- public void setC2s_fragments(Long c2s_fragments) {
+ public void setC2s_fragments(long c2s_fragments) {
this.c2s_fragments = c2s_fragments;
}
- public Long getS2c_fragments() {
+ public long getS2c_fragments() {
return s2c_fragments;
}
- public void setS2c_fragments(Long s2c_fragments) {
+ public void setS2c_fragments(long s2c_fragments) {
this.s2c_fragments = s2c_fragments;
}
- public Long getC2s_tcp_lost_bytes() {
+ public long getC2s_tcp_lost_bytes() {
return c2s_tcp_lost_bytes;
}
- public void setC2s_tcp_lost_bytes(Long c2s_tcp_lost_bytes) {
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
}
- public Long getS2c_tcp_lost_bytes() {
+ public long getS2c_tcp_lost_bytes() {
return s2c_tcp_lost_bytes;
}
- public void setS2c_tcp_lost_bytes(Long s2c_tcp_lost_bytes) {
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
}
- public Long getC2s_tcp_ooorder_pkts() {
+ public long getC2s_tcp_ooorder_pkts() {
return c2s_tcp_ooorder_pkts;
}
- public void setC2s_tcp_ooorder_pkts(Long c2s_tcp_ooorder_pkts) {
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
}
- public Long getS2c_tcp_ooorder_pkts() {
+ public long getS2c_tcp_ooorder_pkts() {
return s2c_tcp_ooorder_pkts;
}
- public void setS2c_tcp_ooorder_pkts(Long s2c_tcp_ooorder_pkts) {
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
}
- public Long getC2s_tcp_retransmitted_pkts() {
+ public long getC2s_tcp_retransmitted_pkts() {
return c2s_tcp_retransmitted_pkts;
}
- public void setC2s_tcp_retransmitted_pkts(Long c2s_tcp_retransmitted_pkts) {
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
}
- public Long getS2c_tcp_retransmitted_pkts() {
+ public long getS2c_tcp_retransmitted_pkts() {
return s2c_tcp_retransmitted_pkts;
}
- public void setS2c_tcp_retransmitted_pkts(Long s2c_tcp_retransmitted_pkts) {
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
}
- public Long getC2s_tcp_retransmitted_bytes() {
+ public long getC2s_tcp_retransmitted_bytes() {
return c2s_tcp_retransmitted_bytes;
}
- public void setC2s_tcp_retransmitted_bytes(Long c2s_tcp_retransmitted_bytes) {
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
}
- public Long getS2c_tcp_retransmitted_bytes() {
+ public long getS2c_tcp_retransmitted_bytes() {
return s2c_tcp_retransmitted_bytes;
}
- public void setS2c_tcp_retransmitted_bytes(Long s2c_tcp_retransmitted_bytes) {
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
}
- public String getClient_ip_sketch() {
- return client_ip_sketch;
- }
-
- public void setClient_ip_sketch(String client_ip_sketch) {
- this.client_ip_sketch = client_ip_sketch;
- }
}
diff --git a/src/main/java/com/zdjizhi/common/pojo/Metrics.java b/src/main/java/com/zdjizhi/common/pojo/Metrics.java
index 58ecbfb..ae69597 100644
--- a/src/main/java/com/zdjizhi/common/pojo/Metrics.java
+++ b/src/main/java/com/zdjizhi/common/pojo/Metrics.java
@@ -10,14 +10,14 @@ public class Metrics {
private String name;
private Tags tags;
private Fields fields;
- private long timestamp;
+ private long timestamp_ms;
- public Metrics(String name, Tags tags, Fields fields, long timestamp) {
+ public Metrics(String name, Tags tags, Fields fields, long timestamp_ms) {
this.name = name;
this.tags = tags;
this.fields = fields;
- this.timestamp = timestamp;
+ this.timestamp_ms = timestamp_ms;
}
public String getName() {
@@ -44,11 +44,11 @@ public class Metrics {
this.fields = fields;
}
- public long getTimestamp() {
- return timestamp;
+ public long getTimestamp_ms() {
+ return timestamp_ms;
}
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
}
}
diff --git a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
index 0f34770..d1a60c9 100644
--- a/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
+++ b/src/main/java/com/zdjizhi/topology/ApplicationProtocolTopology.java
@@ -2,25 +2,31 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
+import com.zdjizhi.common.config.MergeConfiguration;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.functions.filter.DataTypeFilter;
import com.zdjizhi.utils.functions.keyby.DimensionKeyBy;
-import com.zdjizhi.utils.functions.map.MetricsParseMap;
import com.zdjizhi.utils.functions.map.ResultFlatMap;
+import com.zdjizhi.utils.functions.process.ParsingData;
import com.zdjizhi.utils.functions.statistics.DispersionCountWindow;
import com.zdjizhi.utils.functions.statistics.MergeCountWindow;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.eventtime.*;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
+import java.time.Duration;
+
+import static com.zdjizhi.common.config.MergeConfigs.*;
/**
* @author qidaijie
@@ -33,35 +39,56 @@ public class ApplicationProtocolTopology {
public static void main(String[] args) {
try {
+
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- //解析原始日志
- DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
- .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC);
-
- SingleOutputStreamOperator appProtocolFilter = streamSource.filter(new DataTypeFilter())
- .name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM);
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final MergeConfiguration fusionConfiguration = new MergeConfiguration(config);
- SingleOutputStreamOperator> parseDataMap = appProtocolFilter.map(new MetricsParseMap())
- .name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM);
+ //水印
+ WatermarkStrategy> strategyForSession = WatermarkStrategy
+ .>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(WARTERMARK_MAX_ORDERNESS)))
+ .withTimestampAssigner((element, timestamp) -> element.f2);
- SingleOutputStreamOperator dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy())
- .window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME)))
+ //数据源
+ DataStream streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
+
+ //解析数据
+ SingleOutputStreamOperator> parseDataProcess = streamSource.process(new ParsingData())
+ .assignTimestampsAndWatermarks(strategyForSession)
+ .name("ParseDataProcess");
+
+ //增量聚合窗口
+ SingleOutputStreamOperator dispersionCountWindow = parseDataProcess.keyBy(new DimensionKeyBy())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(MergeConfigs.COUNT_WINDOW_TIME))))
.reduce(new DispersionCountWindow(), new MergeCountWindow())
- .name("DispersionCountWindow")
- .setParallelism(GlobalConfig.WINDOW_PARALLELISM);
+ .name("DispersionCountWindow");
+ //拆分数据
SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap())
- .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM);
+ .name("ResultFlatMap");
+ //输出
+ resultFlatMap.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
+ config.get(SINK_KAFKA_TOPIC),
+ config.get(LOG_FAILURES_ONLY)));
- resultFlatMap.addSink(KafkaProducer.getKafkaProducer())
- .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC);
-
- environment.execute(args[0]);
+ environment.execute(config.get(JOB_NAME));
} catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
+ logger.error("This Flink task start ERROR! Exception information is :");
+ e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java b/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java
deleted file mode 100644
index cbf9572..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/filter/DataTypeFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.utils.functions.filter;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSONPath;
-import com.alibaba.fastjson2.JSONReader;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions.filter
- * @Description:
- * @date 2023/4/1919:02
- */
-public class DataTypeFilter implements FilterFunction {
- private static final Log logger = LogFactory.get();
-
- private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
-
- @Override
- public boolean filter(String message) throws Exception {
- boolean protocolData = false;
- try {
- if (StringUtil.isNotBlank(message)) {
- Object name = JSONPath.eval(message, dataTypeExpr);
- if (name != null) {
- protocolData = true;
- }
- }
- } catch (RuntimeException e) {
- logger.error("Parsing metric data is abnormal! The exception message is:" + e.getMessage());
- }
- return protocolData;
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
index 4393729..eed832f 100644
--- a/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
+++ b/src/main/java/com/zdjizhi/utils/functions/keyby/DimensionKeyBy.java
@@ -4,6 +4,9 @@ import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Tags;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.sql.Timestamp;
/**
* @author qidaijie
@@ -11,10 +14,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description:
* @date 2021/7/2112:13
*/
-public class DimensionKeyBy implements KeySelector, String> {
+public class DimensionKeyBy implements KeySelector, String> {
@Override
- public String getKey(Tuple2 value) throws Exception {
+ public String getKey(Tuple3 value) throws Exception {
//以map拼接的key分组
return value.f0.toString();
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java b/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java
deleted file mode 100644
index 48d8757..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/map/MetricsParseMap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.zdjizhi.utils.functions.map;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.zdjizhi.common.pojo.Fields;
-import com.zdjizhi.common.pojo.Tags;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class MetricsParseMap implements MapFunction> {
- private static final Log logger = LogFactory.get();
-
- @Override
- public Tuple2 map(String message) {
- try {
- JSONObject originalLog = JSON.parseObject(message);
- Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
- Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
-
- String appFullPath = tags.getApp_name();
- if (StringUtil.isNotBlank(appFullPath)) {
- String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
- String protocolLabel = tags.getProtocol_stack_id();
-
- tags.setApp_name(appName);
- tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
- }
-
- return new Tuple2<>(tags, fields);
- } catch (RuntimeException e) {
- logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
- return new Tuple2<>(null, null);
- }
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
index 22f2f51..28c01f5 100644
--- a/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
+++ b/src/main/java/com/zdjizhi/utils/functions/map/ResultFlatMap.java
@@ -3,9 +3,6 @@ package com.zdjizhi.utils.functions.map;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
-import com.alibaba.fastjson2.JSONWriter;
-import com.zdjizhi.common.config.GlobalConfig;
-import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.StringUtil;
@@ -20,9 +17,14 @@ import org.apache.flink.util.Collector;
*/
public class ResultFlatMap implements FlatMapFunction {
private static final Log logger = LogFactory.get();
+ /**
+ * 协议分隔符,需要转义
+ */
+ private static final String PROTOCOL_SPLITTER = "\\.";
+
@Override
- public void flatMap(Metrics metrics, Collector out) throws Exception {
+ public void flatMap(Metrics metrics, Collector out) {
try {
Tags tags = metrics.getTags();
String protocolStackId = tags.getProtocol_stack_id();
@@ -30,7 +32,7 @@ public class ResultFlatMap implements FlatMapFunction {
tags.setApp_name(null);
StringBuilder stringBuilder = new StringBuilder();
- String[] protocolIds = protocolStackId.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocolStackId.split(PROTOCOL_SPLITTER);
int protocolIdsNum = protocolIds.length;
for (int i = 0; i < protocolIdsNum - 1; i++) {
if (StringUtil.isBlank(stringBuilder.toString())) {
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
new file mode 100644
index 0000000..257915c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.utils.functions.process;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.JSONPath;
+import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Tags;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+public class ParsingData extends ProcessFunction> {
+ private static final Log logger = LogFactory.get();
+
+ private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
+
+ @Override
+ public void processElement(String value, ProcessFunction>.Context ctx, Collector> out) {
+ try {
+ if (StringUtil.isNotBlank(value)) {
+ Object isProtocolData = JSONPath.eval(value, dataTypeExpr);
+ if (isProtocolData != null) {
+ JSONObject originalLog = JSON.parseObject(value);
+ Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
+ Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
+ Long timestamp_ms = originalLog.getLong("timestamp_ms");
+
+ String appFullPath = tags.getApp_name();
+ if (StringUtil.isNotBlank(appFullPath)) {
+ String appName = appFullPath.substring(appFullPath.lastIndexOf(".") + 1);
+ String protocolLabel = tags.getProtocol_stack_id();
+
+ tags.setApp_name(appName);
+ tags.setProtocol_stack_id(protocolLabel.concat(".").concat(appFullPath));
+ }
+
+ out.collect(new Tuple3<>(tags, fields, timestamp_ms));
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("Parsing application_protocol_stat data is abnormal! The exception message is: {}", e.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
index 57ebde1..44276e2 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/DispersionCountWindow.java
@@ -3,10 +3,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
import com.zdjizhi.utils.general.MetricUtil;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
/**
* @author qidaijie
@@ -14,21 +16,24 @@ import org.apache.flink.api.java.tuple.Tuple2;
* @Description:
* @date 2023/4/2314:02
*/
-public class DispersionCountWindow implements ReduceFunction> {
+public class DispersionCountWindow implements ReduceFunction> {
private static final Log logger = LogFactory.get();
+
@Override
- public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
+ public Tuple3 reduce(Tuple3 value1, Tuple3 value2) {
try {
Fields cacheData = value1.f1;
Fields newData = value2.f1;
Fields metricsResult = MetricUtil.statisticsMetrics(cacheData, newData);
- return new Tuple2<>(value1.f0, metricsResult);
+ return new Tuple3<>(value1.f0, metricsResult, value1.f2);
} catch (RuntimeException e) {
logger.error("An exception occurred during incremental aggregation! The message is:" + e.getMessage());
return value1;
}
}
+
+
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
index e3179a7..4766000 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/MergeCountWindow.java
@@ -2,11 +2,12 @@ package com.zdjizhi.utils.functions.statistics;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
+import com.zdjizhi.common.config.MergeConfigs;
import com.zdjizhi.common.pojo.Fields;
import com.zdjizhi.common.pojo.Metrics;
import com.zdjizhi.common.pojo.Tags;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@@ -17,19 +18,33 @@ import org.apache.flink.util.Collector;
* @Description:
* @date 2023/4/2314:43
*/
-public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> {
+public class MergeCountWindow extends ProcessWindowFunction, Metrics, String, TimeWindow> {
private static final Log logger = LogFactory.get();
+ private String NAME = null;
+
@Override
- public void process(String windowKey, Context context, Iterable> input, Collector output) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ final Configuration configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+
+ NAME = configuration.get(MergeConfigs.MEASUREMENT_NAME);
+
+ }
+
+ @Override
+ public void process(String windowKey, Context context, Iterable> input, Collector output) {
try {
- Long endTime = context.window().getStart() / 1000;
- for (Tuple2 tuple : input) {
+ long timestamp_ms = context.window().getStart();
+ for (Tuple3 tuple : input) {
Tags tags = tuple.f0;
Fields fields = tuple.f1;
- Metrics metrics = new Metrics(GlobalConfig.MEASUREMENT_NAME, tags, fields, endTime);
+ Metrics metrics = new Metrics(NAME, tags, fields, timestamp_ms);
output.collect(metrics);
}
+
} catch (RuntimeException e) {
logger.error("An exception occurred in the process of full data aggregation! The message is:" + e.getMessage());
}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
index 6a48bcf..dc3c0c6 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricUtil.java
@@ -2,13 +2,7 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.common.pojo.Fields;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.datasketches.hll.Union;
-
-import java.util.Base64;
/**
@@ -19,8 +13,6 @@ import java.util.Base64;
*/
public class MetricUtil {
private static final Log logger = LogFactory.get();
- private static final String METRICS_DEFAULT_TYPE = "agent";
-
/**
* 用于对业务指标进行统计
@@ -30,106 +22,66 @@ public class MetricUtil {
*/
public static Fields statisticsMetrics(Fields cacheData, Fields newData) {
- Long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions());
+ long sessions = MetricUtil.longSum(cacheData.getSessions(), newData.getSessions());
- Long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes());
- Long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes());
- Long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts());
- Long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts());
+ long inBytes = MetricUtil.longSum(cacheData.getIn_bytes(), newData.getIn_bytes());
+ long outBytes = MetricUtil.longSum(cacheData.getOut_bytes(), newData.getOut_bytes());
+ long inPkts = MetricUtil.longSum(cacheData.getIn_pkts(), newData.getIn_pkts());
+ long outPkts = MetricUtil.longSum(cacheData.getOut_pkts(), newData.getOut_pkts());
- Long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes());
- Long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes());
- Long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts());
- Long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts());
+ long c2sBytes = MetricUtil.longSum(cacheData.getC2s_bytes(), newData.getC2s_bytes());
+ long s2cBytes = MetricUtil.longSum(cacheData.getS2c_bytes(), newData.getS2c_bytes());
+ long c2sPkts = MetricUtil.longSum(cacheData.getC2s_pkts(), newData.getC2s_pkts());
+ long s2cPkts = MetricUtil.longSum(cacheData.getS2c_pkts(), newData.getS2c_pkts());
- Long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments());
- Long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments());
+ long c2sFragments = MetricUtil.longSum(cacheData.getC2s_fragments(), newData.getC2s_fragments());
+ long s2cFragments = MetricUtil.longSum(cacheData.getS2c_fragments(), newData.getS2c_fragments());
- Long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes());
- Long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes());
+ long c2sTcpLostBytes = MetricUtil.longSum(cacheData.getC2s_tcp_lost_bytes(), newData.getC2s_tcp_lost_bytes());
+ long s2cTcpLostBytes = MetricUtil.longSum(cacheData.getS2c_tcp_lost_bytes(), newData.getS2c_tcp_lost_bytes());
- Long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts());
- Long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts());
+ long c2sTcpooorderPkts = MetricUtil.longSum(cacheData.getC2s_tcp_ooorder_pkts(), newData.getC2s_tcp_ooorder_pkts());
+ long s2cTcpooorderPkts = MetricUtil.longSum(cacheData.getS2c_tcp_ooorder_pkts(), newData.getS2c_tcp_ooorder_pkts());
- Long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts());
- Long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts());
+ long c2sTcpretransmittedPkts = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_pkts(), newData.getC2s_tcp_retransmitted_pkts());
+ long s2cTcpretransmittedPkts = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_pkts(), newData.getS2c_tcp_retransmitted_pkts());
- Long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
- Long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
+ long c2sTcpretransmittedBytes = MetricUtil.longSum(cacheData.getC2s_tcp_retransmitted_bytes(), newData.getC2s_tcp_retransmitted_bytes());
+ long s2cTcpretransmittedBytes = MetricUtil.longSum(cacheData.getS2c_tcp_retransmitted_bytes(), newData.getS2c_tcp_retransmitted_bytes());
- if (METRICS_DEFAULT_TYPE.equals(GlobalConfig.METICS_DATA_SOURCE)) {
- String clientIpSketch = MetricUtil.hllSketchUnion(cacheData.getClient_ip_sketch(), newData.getClient_ip_sketch());
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- clientIpSketch);
- } else {
- return new Fields(sessions,
- inBytes, outBytes, inPkts, outPkts,
- c2sPkts, s2cPkts, c2sBytes, s2cBytes,
- c2sFragments, s2cFragments,
- c2sTcpLostBytes, s2cTcpLostBytes,
- c2sTcpooorderPkts, s2cTcpooorderPkts,
- c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
- c2sTcpretransmittedBytes, s2cTcpretransmittedBytes,
- null);
- }
+ return new Fields(sessions,
+ inBytes, outBytes, inPkts, outPkts,
+ c2sPkts, s2cPkts, c2sBytes, s2cBytes,
+ c2sFragments, s2cFragments,
+ c2sTcpLostBytes, s2cTcpLostBytes,
+ c2sTcpooorderPkts, s2cTcpooorderPkts,
+ c2sTcpretransmittedPkts, s2cTcpretransmittedPkts,
+ c2sTcpretransmittedBytes, s2cTcpretransmittedBytes);
}
/**
* Long类型的数据求和
*
- * @param value1 第一个值
- * @param value2 第二个值
- * @return value1 + value2
+ * @param cacheData 缓存中的值
+ * @param newData 新来数据的值
+ * @return cacheData + newData
*/
- private static Long longSum(Long value1, Long value2) {
- Long result;
+ private static long longSum(long cacheData, long newData) {
+
+ long result;
try {
- if (value1 >= 0 && value2 >= 0) {
- result = value1 + value2;
+ if (cacheData >= 0 && newData >= 0) {
+ result = cacheData + newData;
} else {
- result = value1;
+ result = cacheData;
}
} catch (RuntimeException e) {
- logger.error("Abnormal sending of traffic indicator statistics! The message is:" + e.getMessage());
- result = value1;
+ logger.error("Abnormal sending of traffic indicator statistics! The message is:{}", e);
+ result = cacheData;
}
return result;
}
- /**
- * @param cacheHll 缓存的sketch
- * @param newHll 聚合后的sketch
- * @return 合并后的sketch
- */
- private static String hllSketchUnion(String cacheHll, String newHll) {
- Union union = new Union(12);
- try {
- if (StringUtil.isNotBlank(cacheHll)) {
- byte[] cacheHllBytes = Base64.getDecoder().decode(cacheHll);
- HllSketch cacheSketch = HllSketch.heapify(cacheHllBytes);
- union.update(cacheSketch);
- }
-
- if (StringUtil.isNotBlank(newHll)) {
- byte[] newHllBytes = Base64.getDecoder().decode(newHll);
- HllSketch newSketch = HllSketch.heapify(newHllBytes);
- union.update(newSketch);
- }
- return Base64.getEncoder().encodeToString(union.getResult().toUpdatableByteArray());
-
- } catch (RuntimeException e) {
- logger.error("Merge hllSketch results abnormal! The message is:" + e.getMessage());
- return null;
- }
- }
-
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
deleted file mode 100644
index 877b2e6..0000000
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.zdjizhi.utils.kafka;
-
-import com.zdjizhi.common.config.GlobalConfig;
-import org.apache.kafka.common.config.SslConfigs;
-
-import java.util.Properties;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.kafka
- * @Description:
- * @date 2021/9/610:37
- */
-class CertUtils {
- /**
- * Kafka SASL认证端口
- */
- private static final String SASL_PORT = "9094";
-
- /**
- * Kafka SSL认证端口
- */
- private static final String SSL_PORT = "9095";
-
- /**
- * 根据连接信息端口判断认证方式。
- *
- * @param servers kafka 连接信息
- * @param properties kafka 连接配置信息
- */
- static void chooseCert(String servers, Properties properties) {
- if (servers.contains(SASL_PORT)) {
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + GlobalConfig.KAFKA_SASL_JAAS_USER + " password=" + GlobalConfig.KAFKA_SASL_JAAS_PIN + ";");
- } else if (servers.contains(SSL_PORT)) {
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", GlobalConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.truststore.location", GlobalConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.key.password", GlobalConfig.KAFKA_SASL_JAAS_PIN);
- }
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index b0bc2fb..397814f 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,8 +1,8 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
@@ -13,35 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54
*/
public class KafkaConsumer {
- private static Properties createConsumerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SOURCE_KAFKA_SERVERS);
- properties.put("group.id", GlobalConfig.GROUP_ID);
- properties.put("session.timeout.ms", GlobalConfig.SESSION_TIMEOUT_MS);
- properties.put("max.poll.records", GlobalConfig.MAX_POLL_RECORDS);
- properties.put("max.partition.fetch.bytes", GlobalConfig.MAX_PARTITION_FETCH_BYTES);
- properties.put("partition.discovery.interval.ms", "10000");
-
- CertUtils.chooseCert(GlobalConfig.SOURCE_KAFKA_SERVERS, properties);
-
- return properties;
- }
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
- public static FlinkKafkaConsumer getKafkaConsumer() {
- FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(GlobalConfig.SOURCE_KAFKA_TOPIC,
- new SimpleStringSchema(), createConsumerConfig());
+ public static FlinkKafkaConsumer getKafkaConsumer(Properties properties, String topic, String startupMode) {
- //随着checkpoint提交,将offset提交到kafka
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
- //从消费组当前的offset开始消费
- kafkaConsumer.setStartFromGroupOffsets();
+ FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
+
+ switch (startupMode) {
+ case "group":
+ kafkaConsumer.setStartFromGroupOffsets();
+ break;
+ case "latest":
+ kafkaConsumer.setStartFromLatest();
+ break;
+ case "earliest":
+ kafkaConsumer.setStartFromEarliest();
+ break;
+ default:
+ kafkaConsumer.setStartFromGroupOffsets();
+ }
return kafkaConsumer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index ca62061..c7cd3f2 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.config.GlobalConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional;
import java.util.Properties;
@@ -16,33 +14,29 @@ import java.util.Properties;
*/
public class KafkaProducer {
- private static Properties createProducerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", GlobalConfig.SINK_KAFKA_SERVERS);
- properties.put("acks", GlobalConfig.PRODUCER_ACK);
- properties.put("retries", GlobalConfig.RETRIES);
- properties.put("linger.ms", GlobalConfig.LINGER_MS);
- properties.put("request.timeout.ms", GlobalConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", GlobalConfig.BATCH_SIZE);
- properties.put("buffer.memory", GlobalConfig.BUFFER_MEMORY);
- properties.put("max.request.size", GlobalConfig.MAX_REQUEST_SIZE);
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, GlobalConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+ public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
+ setDefaultConfig(properties, "ack", 1);
+ setDefaultConfig(properties, "retries", 0);
+ setDefaultConfig(properties, "linger.ms", 10);
+ setDefaultConfig(properties, "request.timeout.ms", 30000);
+ setDefaultConfig(properties, "batch.size", 262144);
+ setDefaultConfig(properties, "buffer.memory", 134217728);
+ setDefaultConfig(properties, "max.request.size", 10485760);
+ setDefaultConfig(properties, "compression.type", "snappy");
- CertUtils.chooseCert(GlobalConfig.SINK_KAFKA_SERVERS, properties);
-
- return properties;
- }
-
-
- public static FlinkKafkaProducer getKafkaProducer() {
- FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
- GlobalConfig.SINK_KAFKA_TOPIC,
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
+ topic,
new SimpleStringSchema(),
- createProducerConfig(), Optional.empty());
+ properties, Optional.empty());
- //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
- kafkaProducer.setLogFailuresOnly(true);
+ kafkaProducer.setLogFailuresOnly(logFailuresOnly);
return kafkaProducer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
index 5a6b542..facffc7 100644
--- a/src/main/java/log4j.properties
+++ b/src/main/java/log4j.properties
@@ -1,14 +1,14 @@
#Log4j
-log4j.rootLogger=error,console,file
+log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=error
+log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.file.Threshold=error
+log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
@@ -18,8 +18,8 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
-log4j.logger.com.nis.web.dao=error
+log4j.logger.com.nis.web.dao=info
#bonecp数据源配置
-log4j.category.com.jolbox=error,console
+log4j.category.com.jolbox=info,console
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
new file mode 100644
index 0000000..7a2b5d3
--- /dev/null
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -0,0 +1,58 @@
+package com.zdjizhi;
+
+import com.zdjizhi.conf.FusionConfiguration;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.util.Collector;
+
+import static com.zdjizhi.conf.FusionConfigs.*;
+
+public class ConfigTest {
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final ParameterTool tool;
+ try {
+ tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
+
+ System.out.println(config.get(SOURCE_KAFKA_TOPIC));
+ System.out.println(config.get(SINK_KAFKA_TOPIC));
+ System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+ System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+
+
+ final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
+ config.get(SOURCE_KAFKA_TOPIC),
+ new SimpleStringSchema(),
+ fusionConfiguration
+ .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
+
+ final DataStreamSource sourceStream = environment.addSource(kafkaConsumer);
+
+ sourceStream.process(new ProcessFunction() {
+
+ @Override
+ public void processElement(String value, ProcessFunction.Context ctx, Collector out) {
+
+ out.collect(value);
+ }
+ }).print();
+
+
+
+ environment.execute();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/ConventionalTest.java b/src/test/java/com/zdjizhi/ConventionalTest.java
index 7900d61..287b3fb 100644
--- a/src/test/java/com/zdjizhi/ConventionalTest.java
+++ b/src/test/java/com/zdjizhi/ConventionalTest.java
@@ -1,6 +1,5 @@
package com.zdjizhi;
-import com.zdjizhi.common.config.GlobalConfig;
import com.zdjizhi.utils.StringUtil;
import org.junit.Test;
@@ -22,7 +21,7 @@ public class ConventionalTest {
System.out.println(protocol);
StringBuffer stringBuffer = new StringBuffer();
String appName = "qq_r2";
- String[] protocolIds = protocol.split(GlobalConfig.PROTOCOL_SPLITTER);
+ String[] protocolIds = protocol.split("\\.");
for (String proto : protocolIds) {
if (StringUtil.isBlank(stringBuffer.toString())) {
stringBuffer.append(proto);
diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java
deleted file mode 100644
index 09cb291..0000000
--- a/src/test/java/com/zdjizhi/DatasketchesTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.zdjizhi;
-
-import cn.hutool.json.JSONUtil;
-import com.alibaba.fastjson2.*;
-import com.zdjizhi.utils.JsonMapper;
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.datasketches.hll.TgtHllType;
-import org.apache.datasketches.hll.Union;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Test;
-
-import java.lang.instrument.Instrumentation;
-import java.util.*;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2023/3/217:17
- */
-public class DatasketchesTest {
-
- @Test
- public void HllSketchTest() {
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
-
- HashSet randomStrings = new HashSet<>();
-
- HllSketch randomSketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = makeIPv4Random();
- randomSketch.update(ip);
- randomStrings.add(ip);
- }
-
- System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
- }
-
- @Test
- public void HllSketchUnionTest() {
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- HllSketch sketch2 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.2." + i;
- sketch2.update(ip);
- strings.add(ip);
- }
-
- Union union = new Union(12);
-
- union.update(sketch);
- union.update(sketch2);
- HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
- System.out.println(sketch2.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result.getEstimate() + "--" + strings.size());
- }
-
- @Test
- public void HllSketchDruidTest() {
- HashMap dataMap = new HashMap<>();
-
- HashSet strings = new HashSet<>();
-
- HllSketch sketch = new HllSketch(12);
-
- for (int i = 0; i < 50; i++) {
- String ip = "192.168.1." + i;
- sketch.update(ip);
- strings.add(ip);
- }
-
- HllSketch sketch2 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.2." + i;
- sketch2.update(ip);
- strings.add(ip);
- }
-
- Union union = new Union(12);
-
- union.update(sketch);
- union.update(sketch2);
- HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
-
- HllSketch sketch3 = new HllSketch(12);
-
- for (int i = 0; i < 10; i++) {
- String ip = "192.168.3." + i;
- sketch3.update(ip);
- strings.add(ip);
- }
-
- Union union2 = new Union(12);
-
- union2.update(sketch_result1);
- union2.update(sketch3);
- HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
-
- System.out.println(sketch.getEstimate() + "--" + strings.size());
- System.out.println(sketch2.getEstimate() + "--" + strings.size());
- System.out.println(sketch3.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
- System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
-
- Result result = new Result();
- result.setC2s_pkt_num(10);
- result.setS2c_pkt_num(10);
- result.setC2s_byte_num(10);
- result.setS2c_byte_num(10);
- result.setStat_time(1679970031);
- result.setSchema_type("HLLSketchMergeTest");
-
- //CompactByte
- result.setIp_object(sketch_result2.toCompactByteArray());
-// System.out.println(result.toString());
- //sendMessage(JsonMapper.toJsonString(result);
-
-
- //UpdatableByte
- result.setIp_object(sketch_result2.toUpdatableByteArray());
-// System.out.println(result.toString());
- //sendMessage(JsonMapper.toJsonString(result);
-
- //Hashmap
- dataMap.put("app_name", "TEST");
- dataMap.put("protocol_stack_id", "HTTP");
- dataMap.put("vsys_id", 1);
- dataMap.put("stat_time", 1681370100);
- dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
-
- System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
- System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
- System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
-
-
- dataMap.put("client_ip_sketch", Base64.getEncoder().encode(sketch_result2.toUpdatableByteArray()));
- System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
- System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
- System.out.println(JSONUtil.toJsonStr(dataMap));
-
-// sendMessage(JSONObject.toJSONString(dataMap));
- }
-
- @Test
- public void HllSketchStorageTest() {
- TgtHllType hllType = TgtHllType.HLL_4;
-// TgtHllType hllType = TgtHllType.HLL_6;
-// TgtHllType hllType = TgtHllType.HLL_8;
-
- HllSketch sketch4 = new HllSketch(4,hllType);
- HllSketch sketch8 = new HllSketch(8,hllType);
- HllSketch sketch12 = new HllSketch(12,hllType);
- HllSketch sketch16 = new HllSketch(16,hllType);
- HllSketch sketch21 = new HllSketch(21,hllType);
-
- HashSet IPSet = new HashSet<>();
-
- for (int i = 0; i < 500000; i++) {
- String ip = makeIPv4Random();
- IPSet.add(ip);
- sketch4.update(ip);
- sketch8.update(ip);
- sketch12.update(ip);
- sketch16.update(ip);
- sketch21.update(ip);
- }
- System.out.println(IPSet.size());
- System.out.println(sketch4.toString());
- System.out.println(sketch8.toString());
- System.out.println(sketch12.toString());
- System.out.println(sketch16.toString());
- System.out.println(sketch21.toString());
-
- }
-
-
- //随机生成ip
- private static String makeIPv4Random() {
- int v4_1 = new Random().nextInt(255) + 1;
- int v4_2 = new Random().nextInt(100);
- int v4_3 = new Random().nextInt(100);
- int v4_4 = new Random().nextInt(255);
- return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
- }
-
- private static void sendMessage(Object message) {
- Properties props = new Properties();
- //kafka地址
- props.put("bootstrap.servers", "192.168.44.12:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 67108864);
-// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- KafkaProducer kafkaProducer = new KafkaProducer(props);
-
- kafkaProducer.send(new ProducerRecord("TRAFFIC-PROTOCOL-TEST", message));
-
- kafkaProducer.close();
- }
-}
-
-class Result {
-
- private String schema_type;
- private long c2s_byte_num;
- private long c2s_pkt_num;
- private long s2c_byte_num;
- private long s2c_pkt_num;
- private long stat_time;
- private byte[] ip_object;
-
- public void setSchema_type(String schema_type) {
- this.schema_type = schema_type;
- }
-
- public void setC2s_byte_num(long c2s_byte_num) {
- this.c2s_byte_num = c2s_byte_num;
- }
-
- public void setC2s_pkt_num(long c2s_pkt_num) {
- this.c2s_pkt_num = c2s_pkt_num;
- }
-
- public void setS2c_byte_num(long s2c_byte_num) {
- this.s2c_byte_num = s2c_byte_num;
- }
-
- public void setS2c_pkt_num(long s2c_pkt_num) {
- this.s2c_pkt_num = s2c_pkt_num;
- }
-
- public void setStat_time(long stat_time) {
- this.stat_time = stat_time;
- }
-
- public void setIp_object(byte[] ip_object) {
- this.ip_object = ip_object;
- }
-
- @Override
- public String toString() {
- return "Result{" +
- "schema_type='" + schema_type + '\'' +
- ", c2s_byte_num=" + c2s_byte_num +
- ", c2s_pkt_num=" + c2s_pkt_num +
- ", s2c_byte_num=" + s2c_byte_num +
- ", s2c_pkt_num=" + s2c_pkt_num +
- ", stat_time=" + stat_time +
- ", ip_object=" + Arrays.toString(ip_object) +
- '}';
- }
-}
\ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/FlagsTest.java b/src/test/java/com/zdjizhi/FlagsTest.java
index 15ee4aa..8449f52 100644
--- a/src/test/java/com/zdjizhi/FlagsTest.java
+++ b/src/test/java/com/zdjizhi/FlagsTest.java
@@ -32,26 +32,25 @@ public class FlagsTest {
@Test
public void bitwiseAND() {
- Long common_flags = 8200L;
+ Long flags = 24712L;
Long clientIsLocal = 8L;
Long serverIsLocal = 16L;
- System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
- System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
+ System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal));
+ System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n");
- common_flags = 16400L;
+ flags = 16400L;
- System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
- System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal));
+ System.out.println("flags & clientIsLocal = " + (flags & clientIsLocal));
+ System.out.println("flags & serverIsLocal = " + (flags & serverIsLocal)+"\n\n");
+ flags = 24712L;
+ System.out.println("flags & c2s = " + (flags & 8192));
+ System.out.println("flags & s2c = " + (flags & 16384));
+ System.out.println("flags & Bidirectional = " + (flags & 32768));
- if ((0L & clientIsLocal) == 0L){
- System.out.println("yes");
- }else {
- System.out.println("no");
- }
}
}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfigs.java b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
new file mode 100644
index 0000000..ca18112
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfigs.java
@@ -0,0 +1,34 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FusionConfigs {
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+ /**
+ * Configuration option for the Kafka topic used in the source.
+ */
+ public static final ConfigOption SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+ /**
+ * Configuration option for the Kafka topic used in the sink.
+ */
+ public static final ConfigOption SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+}
diff --git a/src/test/java/com/zdjizhi/conf/FusionConfiguration.java b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
new file mode 100644
index 0000000..1e6dcf2
--- /dev/null
+++ b/src/test/java/com/zdjizhi/conf/FusionConfiguration.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.conf;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+public class FusionConfiguration {
+ private final Configuration config;
+
+ public FusionConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+
+}