diff --git a/pom.xml b/pom.xml
index 08a1647..2256fc9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 2.2.0
+ 2.2.1
app-protocol-stat-traffic-merge
http://www.example.com
@@ -36,7 +36,6 @@
UTF-8
1.13.1
1.0.0
- 1.2.0
5.7.17
3.2.0
1.9.3
diff --git a/properties/application.properties b/properties/application.properties
index 0f66f4e..16c7a25 100644
--- a/properties/application.properties
+++ b/properties/application.properties
@@ -1,5 +1,5 @@
#kafka 接收数据topic
-source.kafka.topic=NETWORK-TRAFFIC-METRIC-TEST
+source.kafka.topic=NETWORK-TRAFFIC-METRIC
source.kafka.props.bootstrap.servers=192.168.44.12:9094
@@ -12,7 +12,7 @@ source.kafka.props.sasl.mechanism=PLAIN
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
#补全数据 输出 topic
-sink.kafka.topic=APP-PROTOCOL-TEST-RESULT
+sink.kafka.topic=NETWORK-TRAFFIC-METRIC
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
diff --git a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
index e9a40bf..0f06a26 100644
--- a/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
+++ b/src/main/java/com/zdjizhi/utils/functions/process/ParsingData.java
@@ -15,6 +15,18 @@ import org.apache.flink.util.Collector;
public class ParsingData extends ProcessFunction> {
private static final Log logger = LogFactory.get();
+ /**
+ * 适配TSG 24.02日志重组前数据结构,待过期后删除此处代码
+ */
+ @Deprecated
+ private static final String LEGACY_PROTOCOL_KEY_NAME = "protocol_label";
+
+ /**
+ * 适配TSG 24.02日志重组前数据结构,待过期后删除此处代码
+ */
+ @Deprecated
+ private static final String LEGACY_APP_KEY_NAME = "app_full_path";
+
private static final String dataTypeExpr = "[?(@.name = 'traffic_application_protocol_stat')]";
@Override
@@ -24,6 +36,10 @@ public class ParsingData extends ProcessFunction
+ * 待过期后删除此处代码
+ *
+ * @param originalLog 原始Metrics日志
+ */
+ @Deprecated
+ private static void supportingLegacyField(JSONObject originalLog) {
+ JSONObject tags = originalLog.getJSONObject("tags");
+ if (tags.containsKey(LEGACY_PROTOCOL_KEY_NAME)) {
+ tags.put("decoded_path", tags.remove(LEGACY_PROTOCOL_KEY_NAME));
+
+ tags.put("app", tags.remove(LEGACY_APP_KEY_NAME));
+
+ originalLog.put("tags", originalLog.remove("tags"));
+ }
+ }
+
/**
* 避免计算重复的协议,去除Decoded Path(最后一个元素) 与 Application(第一个元素)重复的基础协议。
*
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index c276dc8..d337e04 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -15,7 +15,7 @@ import java.util.Properties;
public class KafkaProducer {
public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
- setDefaultConfig(properties, "ack", "1");
+ setDefaultConfig(properties, "acks", "1");
setDefaultConfig(properties, "retries", 0);
setDefaultConfig(properties, "linger.ms", 10);
setDefaultConfig(properties, "request.timeout.ms", 30000);
@@ -29,7 +29,7 @@ public class KafkaProducer {
new SimpleStringSchema(),
properties, Optional.empty());
- kafkaProducer.setLogFailuresOnly(logFailuresOnly);
+ kafkaProducer.setLogFailuresOnly(false);
return kafkaProducer;
}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
index facffc7..60d6d10 100644
--- a/src/main/java/log4j.properties
+++ b/src/main/java/log4j.properties
@@ -16,7 +16,7 @@ log4j.appender.file.file=${nis.root}/log/galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
-log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=info
#bonecp数据源配置
diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java
index 5fd854a..3a6df5c 100644
--- a/src/test/java/com/zdjizhi/FastJsonTest.java
+++ b/src/test/java/com/zdjizhi/FastJsonTest.java
@@ -77,9 +77,8 @@ public class FastJsonTest {
}
-
@Test
- public void errorJsonTest(){
+ public void errorJsonTest() {
String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}";
JSONObject originalLog = JSON.parseObject(message);
Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
@@ -88,4 +87,16 @@ public class FastJsonTest {
System.out.println(fields.toString());
System.out.println(tags.toString());
}
+
+ @Test
+ public void replaceTest() {
+ String message = "{\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}";
+ JSONObject originalLog = JSON.parseObject(message);
+ JSONObject tags = originalLog.getJSONObject("tags");
+ System.out.println(tags.toJSONString() + "\n\n");
+
+ tags.replace("protocol_label", "replace test");
+ System.out.println(tags.toJSONString() + "\n\n");
+
+ }
}
diff --git a/src/test/java/com/zdjizhi/LegacyFieldTest.java b/src/test/java/com/zdjizhi/LegacyFieldTest.java
new file mode 100644
index 0000000..5b66e20
--- /dev/null
+++ b/src/test/java/com/zdjizhi/LegacyFieldTest.java
@@ -0,0 +1,51 @@
+package com.zdjizhi;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.zdjizhi.common.pojo.Fields;
+import com.zdjizhi.common.pojo.Tags;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2024/3/2614:14
+ */
+public class LegacyFieldTest {
+ @Deprecated
+ private static final String LEGACY_PROTOCOL_KEY_NAME = "protocol_label";
+
+ @Deprecated
+ private static final String LEGACY_APP_KEY_NAME = "app_full_path";
+
+ @Test
+ public void test() {
+ //23.07 version data
+ String message = "{\"fields\":{\"s2c_tcp_retransmitted_pkts\":0,\"sessions\":0},\"name\":\"application_protocol_stat\",\"tags\":{\"data_center\":\"center-xxg-tsgx\",\"device_group\":\"group-xxg-tsgx\",\"device_id\":\"9800165603191151\",\"protocol_label\":\"ETHERNET.IPv4.UDP\",\"app_full_path\":\"google\",\"vsys_id\":1},\"timestamp_ms\":1705907560000}";
+ //24.02 version data
+ //String message = "{\"fields\":{\"s2c_tcp_retransmitted_pkts\":0,\"sessions\":0},\"name\":\"application_protocol_stat\",\"tags\":{\"data_center\":\"center-xxg-tsgx\",\"device_group\":\"group-xxg-tsgx\",\"device_id\":\"9800165603191151\",\"decoded_path\":\"ETHERNET.IPv4.UDP\","app":"google",\"vsys_id\":1},\"timestamp_ms\":1705907560000}";
+ JSONObject originalLog = JSON.parseObject(message);
+ supportingLegacyField(originalLog);
+ Fields fields = JSONObject.parseObject(originalLog.getString("fields"), Fields.class);
+ Tags tags = JSONObject.parseObject(originalLog.getString("tags"), Tags.class);
+
+ JSONObject from = JSONObject.from(tags);
+ System.out.println(from.toJSONString());
+ }
+
+
+ private static void supportingLegacyField(JSONObject originalLog) {
+ JSONObject tags = originalLog.getJSONObject("tags");
+ System.out.println("解析前tags数据:" + tags.toJSONString() + "\n");
+
+ if (tags.containsKey(LEGACY_PROTOCOL_KEY_NAME)) {
+ tags.put("decoded_path", tags.remove(LEGACY_PROTOCOL_KEY_NAME));
+
+ tags.put("app", tags.remove(LEGACY_APP_KEY_NAME));
+
+ originalLog.put("tags", originalLog.remove("tags"));
+ System.out.println("转换后tags数据:" + tags.toJSONString() + "\n");
+ }
+ }
+}