This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-dos-detecti…/src/main/java/com/zdjizhi/utils/KafkaUtils.java

34 lines
1.2 KiB
Java
Raw Normal View History

2021-07-29 10:02:31 +08:00
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Optional;
2021-07-29 10:02:31 +08:00
import java.util.Properties;
public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
2021-09-06 16:19:33 +08:00
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
2021-09-06 16:19:33 +08:00
return properties;
2021-07-29 10:02:31 +08:00
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
return new FlinkKafkaProducer<String>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
2021-07-29 10:02:31 +08:00
);
}
}