From df696e2f65227b950fcde0a51b7ca0f717b2becd Mon Sep 17 00:00:00 2001 From: doufenghu Date: Mon, 13 May 2024 17:40:20 +0800 Subject: [PATCH] =?UTF-8?q?[fix]=20=E4=BF=AE=E5=A4=8D=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89kafka=E9=85=8D=E7=BD=AE=E8=A2=AB=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=A6=86=E7=9B=96=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/application.properties | 32 ++++++++++++++++--- .../zdjizhi/utils/kafka/KafkaConsumer.java | 10 ------ .../zdjizhi/utils/kafka/KafkaProducer.java | 14 +------- src/test/java/com/zdjizhi/ConfigTest.java | 6 ++-- src/test/java/com/zdjizhi/FastJsonTest.java | 2 +- 6 files changed, 34 insertions(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index 2256fc9..7c34db7 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi app-protocol-stat-traffic-merge - 2.2.1 + 2.2.2 app-protocol-stat-traffic-merge http://www.example.com diff --git a/properties/application.properties b/properties/application.properties index 16c7a25..7fc84d0 100644 --- a/properties/application.properties +++ b/properties/application.properties @@ -1,27 +1,51 @@ -#kafka 接收数据topic +# Kafka Source properties + source.kafka.topic=NETWORK-TRAFFIC-METRIC source.kafka.props.bootstrap.servers=192.168.44.12:9094 source.kafka.props.group.id=appapp-protocol-merge-231109-1 +source.kafka.props.max.poll.records=1000 + +source.kafka.session.timeout.ms=60000 + +source.kafka.max.partition.fetch.bytes=31457280 + source.kafka.props.security.protocol=SASL_PLAINTEXT source.kafka.props.sasl.mechanism=PLAIN source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; -#补全数据 输出 topic +# Kafka Sink properties + sink.kafka.topic=NETWORK-TRAFFIC-METRIC sink.kafka.props.bootstrap.servers=192.168.44.12:9094 +sink.kafka.props.linger.ms=1 + +sink.kafka.acks=1 + +sink.kafka.retries=0 + +sink.kafka.request.timeout.ms=30000 + +sink.kafka.batch.size=262144 + +sink.kafka.buffer.memory=134217728 + +sink.kafka.max.request.size=10485760 + +sink.kafka.compression.type=snappy + sink.kafka.props.security.protocol=SASL_PLAINTEXT sink.kafka.props.sasl.mechanism=PLAIN sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; -count.window.time=5 +count.window.time=1 -watermark.max.orderness=5 \ No newline at end of file +watermark.max.orderness=1 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 397814f..ca360bc 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -21,10 +21,6 @@ public class KafkaConsumer { */ public static FlinkKafkaConsumer getKafkaConsumer(Properties properties, String topic, String startupMode) { - setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); - setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); - setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); - FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); switch (startupMode) { @@ -43,10 +39,4 @@ public class KafkaConsumer { return kafkaConsumer; } - - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index d337e04..88cd058 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,14 +15,7 @@ import java.util.Properties; public class KafkaProducer { public static FlinkKafkaProducer getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { - setDefaultConfig(properties, "acks", "1"); - setDefaultConfig(properties, "retries", 0); - setDefaultConfig(properties, "linger.ms", 10); - setDefaultConfig(properties, "request.timeout.ms", 30000); - setDefaultConfig(properties, "batch.size", 262144); - setDefaultConfig(properties, "buffer.memory", 134217728); - setDefaultConfig(properties, "max.request.size", 10485760); - setDefaultConfig(properties, "compression.type", "snappy"); + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( topic, @@ -34,9 +27,4 @@ public class KafkaProducer { return kafkaProducer; } - private static void setDefaultConfig(Properties properties, String key, Object value) { - if (!properties.contains(key)) { - properties.put(key, value); - } - } } diff --git a/src/test/java/com/zdjizhi/ConfigTest.java b/src/test/java/com/zdjizhi/ConfigTest.java index 7a2b5d3..53cc764 100644 --- a/src/test/java/com/zdjizhi/ConfigTest.java +++ b/src/test/java/com/zdjizhi/ConfigTest.java @@ -19,15 +19,15 @@ public class ConfigTest { final ParameterTool tool; try { - tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties"); + tool = ParameterTool.fromPropertiesFile("/Users/darnell/IdeaProjects/app-protocol-stat-traffic-merge/properties/application.properties"); final Configuration config = tool.getConfiguration(); environment.getConfig().setGlobalJobParameters(config); final FusionConfiguration fusionConfiguration = new FusionConfiguration(config); System.out.println(config.get(SOURCE_KAFKA_TOPIC)); System.out.println(config.get(SINK_KAFKA_TOPIC)); - System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); - System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); + System.out.println("#####"+ fusionConfiguration.getProperties( SOURCE_KAFKA_PROPERTIES_PREFIX)); + System.out.println("#####"+ fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( diff --git a/src/test/java/com/zdjizhi/FastJsonTest.java b/src/test/java/com/zdjizhi/FastJsonTest.java index 3a6df5c..bf04b83 100644 --- a/src/test/java/com/zdjizhi/FastJsonTest.java +++ b/src/test/java/com/zdjizhi/FastJsonTest.java @@ -77,7 +77,7 @@ public class FastJsonTest { } - @Test + @Test(expected=JSONException.class) public void errorJsonTest() { String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}"; JSONObject originalLog = JSON.parseObject(message);