[fix] 修复自定义kafka配置被默认配置覆盖的问题

This commit is contained in:
doufenghu
2024-05-13 17:40:20 +08:00
parent fc650bb9e5
commit df696e2f65
6 changed files with 34 additions and 32 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>app-protocol-stat-traffic-merge</artifactId>
<version>2.2.1</version>
<version>2.2.2</version>
<name>app-protocol-stat-traffic-merge</name>
<url>http://www.example.com</url>

View File

@@ -1,27 +1,51 @@
#kafka 接收数据topic
# Kafka Source properties
source.kafka.topic=NETWORK-TRAFFIC-METRIC
source.kafka.props.bootstrap.servers=192.168.44.12:9094
source.kafka.props.group.id=appapp-protocol-merge-231109-1
source.kafka.props.max.poll.records=1000
source.kafka.session.timeout.ms=60000
source.kafka.max.partition.fetch.bytes=31457280
source.kafka.props.security.protocol=SASL_PLAINTEXT
source.kafka.props.sasl.mechanism=PLAIN
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
#补全数据 输出 topic
# Kafka Sink properties
sink.kafka.topic=NETWORK-TRAFFIC-METRIC
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
sink.kafka.props.linger.ms=1
sink.kafka.acks=1
sink.kafka.retries=0
sink.kafka.request.timeout.ms=30000
sink.kafka.batch.size=262144
sink.kafka.buffer.memory=134217728
sink.kafka.max.request.size=10485760
sink.kafka.compression.type=snappy
sink.kafka.props.security.protocol=SASL_PLAINTEXT
sink.kafka.props.sasl.mechanism=PLAIN
sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
count.window.time=5
count.window.time=1
watermark.max.orderness=5
watermark.max.orderness=1

View File

@@ -21,10 +21,6 @@ public class KafkaConsumer {
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
switch (startupMode) {
@@ -43,10 +39,4 @@ public class KafkaConsumer {
return kafkaConsumer;
}
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
}

View File

@@ -15,14 +15,7 @@ import java.util.Properties;
public class KafkaProducer {
public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
setDefaultConfig(properties, "acks", "1");
setDefaultConfig(properties, "retries", 0);
setDefaultConfig(properties, "linger.ms", 10);
setDefaultConfig(properties, "request.timeout.ms", 30000);
setDefaultConfig(properties, "batch.size", 262144);
setDefaultConfig(properties, "buffer.memory", 134217728);
setDefaultConfig(properties, "max.request.size", 10485760);
setDefaultConfig(properties, "compression.type", "snappy");
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
@@ -34,9 +27,4 @@ public class KafkaProducer {
return kafkaProducer;
}
private static void setDefaultConfig(Properties properties, String key, Object value) {
if (!properties.contains(key)) {
properties.put(key, value);
}
}
}

View File

@@ -19,15 +19,15 @@ public class ConfigTest {
final ParameterTool tool;
try {
tool = ParameterTool.fromPropertiesFile("D:\\workerspace\\flink\\test.properties");
tool = ParameterTool.fromPropertiesFile("/Users/darnell/IdeaProjects/app-protocol-stat-traffic-merge/properties/application.properties");
final Configuration config = tool.getConfiguration();
environment.getConfig().setGlobalJobParameters(config);
final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
System.out.println(config.get(SOURCE_KAFKA_TOPIC));
System.out.println(config.get(SINK_KAFKA_TOPIC));
System.out.println(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
System.out.println("#####"+ fusionConfiguration.getProperties( SOURCE_KAFKA_PROPERTIES_PREFIX));
System.out.println("#####"+ fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(

View File

@@ -77,7 +77,7 @@ public class FastJsonTest {
}
@Test
@Test(expected=JSONException.class)
public void errorJsonTest() {
String message = "{\"fields\":{\"c2s_bytes\":2292,\"c2s_fragments\":0,\"c2s_pkts\":13,\"c2s_tcp_lost_bytes\":0,\"c2s_tcp_ooorder_pkts\":0,\"c2s_tcp_retransmitted_bytes\":0,\"c2s_tcp_retransmitted_pkts\":0,\"ytes\":2292,\"out_pkts\":13,\"s2c_bytes\":4695,\"s2c_fragments\":0,\"s2c_pkts\":12,\"s2c_tcp_lost_bytes\":0,\"s2c_tcp_ooorder_pkts\":0,\"s2c_tcp_retransmitted_bytes\":0,\"s2c_tcp_retransmitraffic_application_protocol_stat\",\"tags\":{\"app_full_path\":\"ssl.https.qq_web.wecom\",\"app_name\":\"app_metric\",\"data_center\":\"center-xxg-7400\",\"device_group\":\"group-xxg-7400\",dc-161\",\"protocol_label\":\"ETHERNET.IPv4.TCP\",\"table_name\":\"traffic_application_protocol_stat\",\"vsys_id\":\"1\"},\"timestamp\":1688708725000}";
JSONObject originalLog = JSON.parseObject(message);