package com.zdjizhi.utils.kafka; import com.zdjizhi.common.VoipRelationConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Optional; import java.util.Properties; /** * @author qidaijie * @Package com.zdjizhi.utils.kafka * @Description: * @date 2021/6/814:04 */ public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); properties.put("bootstrap.servers", VoipRelationConfig.SINK_KAFKA_SERVERS); properties.put("acks", VoipRelationConfig.PRODUCER_ACK); properties.put("retries", VoipRelationConfig.RETRIES); properties.put("linger.ms", VoipRelationConfig.LINGER_MS); properties.put("request.timeout.ms", VoipRelationConfig.REQUEST_TIMEOUT_MS); properties.put("batch.size", VoipRelationConfig.BATCH_SIZE); properties.put("buffer.memory", VoipRelationConfig.BUFFER_MEMORY); properties.put("max.request.size", VoipRelationConfig.MAX_REQUEST_SIZE); properties.put("compression.type", VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); CertUtils.chooseCert(VoipRelationConfig.SINK_KAFKA_SERVERS, properties); return properties; } public static FlinkKafkaProducer getKafkaProducer() { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( VoipRelationConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty()); //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 kafkaProducer.setLogFailuresOnly(false); //写入kafka的消息携带时间戳 // kafkaProducer.setWriteTimestampToKafka(true); return kafkaProducer; } }