package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Optional; import java.util.Properties; public class KafkaUtils { private static Properties getKafkaSinkProperty(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); } return properties; } public static FlinkKafkaProducer getKafkaSink(String topic){ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( topic, new SimpleStringSchema(), getKafkaSinkProperty(), Optional.empty() ); kafkaProducer.setLogFailuresOnly(true); return kafkaProducer; } }