diff --git a/pom.xml b/pom.xml
index 2256fc9..7c34db7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.zdjizhi
app-protocol-stat-traffic-merge
- 2.2.1
+ 2.2.2
app-protocol-stat-traffic-merge
http://www.example.com
diff --git a/properties/application.properties b/properties/application.properties
index 16c7a25..7fc84d0 100644
--- a/properties/application.properties
+++ b/properties/application.properties
@@ -1,27 +1,51 @@
-#kafka 接收数据topic
+# Kafka Source properties
+
source.kafka.topic=NETWORK-TRAFFIC-METRIC
source.kafka.props.bootstrap.servers=192.168.44.12:9094
source.kafka.props.group.id=appapp-protocol-merge-231109-1
+source.kafka.props.max.poll.records=1000
+
+source.kafka.session.timeout.ms=60000
+
+source.kafka.max.partition.fetch.bytes=31457280
+
source.kafka.props.security.protocol=SASL_PLAINTEXT
source.kafka.props.sasl.mechanism=PLAIN
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
-#补全数据 输出 topic
+# Kafka Sink properties
+
sink.kafka.topic=NETWORK-TRAFFIC-METRIC
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
+sink.kafka.props.linger.ms=1
+
+sink.kafka.acks=1
+
+sink.kafka.retries=0
+
+sink.kafka.request.timeout.ms=30000
+
+sink.kafka.batch.size=262144
+
+sink.kafka.buffer.memory=134217728
+
+sink.kafka.max.request.size=10485760
+
+sink.kafka.compression.type=snappy
+
sink.kafka.props.security.protocol=SASL_PLAINTEXT
sink.kafka.props.sasl.mechanism=PLAIN
sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
-count.window.time=5
+count.window.time=1
-watermark.max.orderness=5
\ No newline at end of file
+watermark.max.orderness=1
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 397814f..ca360bc 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -21,10 +21,6 @@ public class KafkaConsumer {
*/
public static FlinkKafkaConsumer getKafkaConsumer(Properties properties, String topic, String startupMode) {
- setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
- setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
- setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
-
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
switch (startupMode) {
@@ -43,10 +39,4 @@ public class KafkaConsumer {
return kafkaConsumer;
}
-
- private static void setDefaultConfig(Properties properties, String key, Object value) {
- if (!properties.contains(key)) {
- properties.put(key, value);
- }
- }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index d337e04..88cd058 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -15,14 +15,7 @@ import java.util.Properties;
public class KafkaProducer {
public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
- setDefaultConfig(properties, "acks", "1");
- setDefaultConfig(properties, "retries", 0);
- setDefaultConfig(properties, "linger.ms", 10);
- setDefaultConfig(properties, "request.timeout.ms", 30000);
- setDefaultConfig(properties, "batch.size", 262144);
- setDefaultConfig(properties, "buffer.memory", 134217728);
- setDefaultConfig(properties, "max.request.size", 10485760);
- setDefaultConfig(properties, "compression.type", "snappy");
+
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
topic,
@@ -34,9 +27,4 @@ public class KafkaProducer {
return kafkaProducer;
}
- private static void setDefaultConfig(Properties properties, String key, Object value) {
- if (!properties.contains(key)) {
- properties.put(key, value);
- }
- }
}
diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java
index 7a2b5d3..53cc764 100644
--- a/src/test/java/com/zdjizhi/ConfigTest.java
+++ b/src/test/java/com/zdjizhi/ConfigTest.java
@@ -19,15 +19,15 @@ public class ConfigTest {
final ParameterTool tool;
try {
- tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
+ tool = ParameterTool.fromPropertiesFile("/Users/darnell/IdeaProjects/app-protocol-stat-traffic-merge/properties/application.properties");
final Configuration config = tool.getConfiguration();
environment.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
System.out.println(config.get(SOURCE_KAFKA_TOPIC));
System.out.println(config.get(SINK_KAFKA_TOPIC));
- System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
- System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
+ System.out.println("#####"+ fusionConfiguration.getProperties( SOURCE_KAFKA_PROPERTIES_PREFIX));
+ System.out.println("#####"+ fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java
index 3a6df5c..bf04b83 100644
--- a/src/test/java/com/zdjizhi/FastJsonTest.java
+++ b/src/test/java/com/zdjizhi/FastJsonTest.java
@@ -77,7 +77,7 @@ public class FastJsonTest {
}
- @Test
+ @Test(expected=JSONException.class)
public void errorJsonTest() {
String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}";
JSONObject originalLog = JSON.parseObject(message);