diff --git a/pom.xml b/pom.xml index c5a4e80..606d7b9 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-olap-analysis-schema - 20220113-balance + 20220309-balance log-olap-analysis-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index e5f6b91..4961a5d 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,4 +1,4 @@ -#====================Kafka Consumer====================# +#====================Kafka KafkaConsumer====================# #kafka source connection timeout session.timeout.ms=60000 @@ -7,7 +7,7 @@ max.poll.records=3000 #kafka source poll bytes max.partition.fetch.bytes=31457280 -#====================Kafka Producer====================# +#====================Kafka KafkaProducer====================# #producer重试的次数设置 retries=0 @@ -28,11 +28,6 @@ buffer.memory=134217728 #10M max.request.size=10485760 #====================kafka default====================# -#kafka source protocol; SSL or SASL -kafka.source.protocol=SASL - -#kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL #kafka SASL验证用户名 kafka.user=admin diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index 325e04d..f6100f3 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -33,7 +33,6 @@ public class StreamAggregateConfig { public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); - public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol"); public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"); public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries"); @@ -50,7 +49,6 @@ public class StreamAggregateConfig { public static final String SOURCE_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.servers"); public static final String SOURCE_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.topic"); public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); - public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol"); public static final String SESSION_TIMEOUT_MS = StreamAggregateConfigurations.getStringProperty(1, "session.timeout.ms"); public static final String MAX_POLL_RECORDS = StreamAggregateConfigurations.getStringProperty(1, "max.poll.records"); public static final String MAX_PARTITION_FETCH_BYTES = StreamAggregateConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index 98f5c96..07a1e87 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -4,8 +4,8 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.functions.*; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; @@ -35,7 +35,7 @@ public class StreamAggregateTopology { //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); - DataStream streamSource = environment.addSource(Consumer.getKafkaConsumer()) + DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction()) @@ -56,7 +56,7 @@ public class StreamAggregateTopology { .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM) - .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); + .addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); environment.execute(args[0]); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index 2608187..cbc92f4 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -12,24 +12,36 @@ import java.util.Properties; * @date 2021/9/610:37 */ class CertUtils { - static void chooseCert(String type, Properties properties) { - switch (type) { - case "SSL": - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN); - properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN); - properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN); - break; - case "SASL": - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";"); - break; - default: + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java similarity index 93% rename from src/main/java/com/zdjizhi/utils/kafka/Consumer.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 9379a1e..85024e1 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -14,7 +14,7 @@ import java.util.Properties; * @Description: * @date 2021/6/813:54 */ -public class Consumer { +public class KafkaConsumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); properties.put("bootstrap.servers", StreamAggregateConfig.SOURCE_KAFKA_SERVERS); @@ -25,7 +25,7 @@ public class Consumer { properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SOURCE_PROTOCOL,properties); + CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java similarity index 94% rename from src/main/java/com/zdjizhi/utils/kafka/Producer.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index dc407e7..c350439 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,7 +15,7 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); @@ -29,7 +29,7 @@ public class Producer { properties.put("max.request.size", StreamAggregateConfig.MAX_REQUEST_SIZE); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(StreamAggregateConfig.SINK_KAFKA_SERVERS, properties); return properties; }