新增kafka sasl认证机制

This commit is contained in:
wanglihui
2021-09-06 16:19:33 +08:00
parent c5943298bd
commit b4237bb4a9
7 changed files with 45 additions and 85 deletions

View File

@@ -61,4 +61,7 @@ public class CommonConfig {
public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password");
}

View File

@@ -20,6 +20,9 @@ public class DosSketchSource {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
return streamExeEnv.addSource(new FlinkKafkaConsumer<String>(
CommonConfig.KAFKA_INPUT_TOPIC_NAME,

View File

@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@@ -12,17 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
public static StreamTableEnvironment getStreamTableEnv() {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
return StreamTableEnvironment.create(streamExeEnv, settings);
}
}

View File

@@ -9,10 +9,14 @@ import java.util.Properties;
public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties propertiesproducer = new Properties();
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
return propertiesproducer;
return properties;
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){