This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-log-complet…/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java

71 lines
2.7 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Map;
import java.util.Properties;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2021/6/813:54
*/
public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
properties.put("group.id", FlowWriteConfig.GROUP_ID);
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
/**
* 用户序列化kafka数据增加 kafka Timestamp内容。
*
* @return kafka logs -> map
*/
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new TimestampDeserializationSchema(), createConsumerConfig());
//随着checkpoint提交将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//从消费组当前的offset开始消费
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
public static FlinkKafkaConsumer<String> flinkConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
}