新增kafka用户名密码加密 TSG-8835
This commit is contained in:
9
pom.xml
9
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>log-olap-analysis-schema</artifactId>
|
||||
<version>220309-balance</version>
|
||||
<version>220316-encryption</version>
|
||||
|
||||
<name>log-olap-analysis-schema</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -226,6 +226,13 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
|
||||
<dependency>
|
||||
<groupId>org.jasypt</groupId>
|
||||
<artifactId>jasypt</artifactId>
|
||||
<version>1.9.3</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
|
||||
@@ -28,12 +28,11 @@ buffer.memory=134217728
|
||||
#10M
|
||||
max.request.size=10485760
|
||||
#====================kafka default====================#
|
||||
#kafka SASL验证用户名-加密
|
||||
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
|
||||
|
||||
#kafka SASL验证用户名
|
||||
kafka.user=admin
|
||||
|
||||
#kafka SASL及SSL验证密码
|
||||
kafka.pin=galaxy2019
|
||||
#kafka SASL及SSL验证密码-加密
|
||||
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
||||
#====================Topology Default====================#
|
||||
|
||||
#两个输出之间的最大时间(单位milliseconds)
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
#--------------------------------地址配置------------------------------#
|
||||
|
||||
#管理kafka地址
|
||||
source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
source.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#管理输出kafka地址
|
||||
sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
sink.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#--------------------------------HTTP------------------------------#
|
||||
#kafka 证书地址
|
||||
|
||||
@@ -2,12 +2,19 @@ package com.zdjizhi.common;
|
||||
|
||||
|
||||
import com.zdjizhi.utils.system.StreamAggregateConfigurations;
|
||||
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public class StreamAggregateConfig {
|
||||
|
||||
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
|
||||
|
||||
static {
|
||||
encryptor.setPassword("galaxy");
|
||||
}
|
||||
|
||||
public static final String FORMAT_SPLITTER = ",";
|
||||
public static final String PROTOCOL_SPLITTER = "\\.";
|
||||
|
||||
@@ -26,15 +33,19 @@ public class StreamAggregateConfig {
|
||||
public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism");
|
||||
public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num");
|
||||
|
||||
/**
|
||||
* Kafka common
|
||||
*/
|
||||
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.user"));
|
||||
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"));
|
||||
|
||||
|
||||
/**
|
||||
* kafka source
|
||||
* kafka sink config
|
||||
*/
|
||||
public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers");
|
||||
public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic");
|
||||
public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
|
||||
public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
|
||||
public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
|
||||
public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries");
|
||||
public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms");
|
||||
public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms");
|
||||
|
||||
@@ -33,15 +33,15 @@ class CertUtils {
|
||||
properties.put("security.protocol", "SASL_PLAINTEXT");
|
||||
properties.put("sasl.mechanism", "PLAIN");
|
||||
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
|
||||
+ StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";");
|
||||
+ StreamAggregateConfig.KAFKA_SASL_JAAS_USER + " password=" + StreamAggregateConfig.KAFKA_SASL_JAAS_PIN + ";");
|
||||
} else if (servers.contains(SSL_PORT)) {
|
||||
properties.put("security.protocol", "SSL");
|
||||
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
||||
properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks");
|
||||
properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN);
|
||||
properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
|
||||
properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks");
|
||||
properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN);
|
||||
properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN);
|
||||
properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
|
||||
properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_SASL_JAAS_PIN);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user