根据04版补全程序更新P19双写程序。
This commit is contained in:
48
src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
Normal file
48
src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
Normal file
@@ -0,0 +1,48 @@
|
||||
package com.zdjizhi.utils.kafka;
|
||||
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.kafka
|
||||
* @Description:
|
||||
* @date 2021/9/610:37
|
||||
*/
|
||||
class CertUtils {
|
||||
/**
|
||||
* Kafka SASL认证端口
|
||||
*/
|
||||
private static final String SASL_PORT = "9094";
|
||||
|
||||
/**
|
||||
* Kafka SSL认证端口
|
||||
*/
|
||||
private static final String SSL_PORT = "9095";
|
||||
|
||||
/**
|
||||
* 根据连接信息端口判断认证方式。
|
||||
*
|
||||
* @param servers kafka 连接信息
|
||||
* @param properties kafka 连接配置信息
|
||||
*/
|
||||
static void chooseCert(String servers, Properties properties) {
|
||||
if (servers.contains(SASL_PORT)) {
|
||||
properties.put("security.protocol", "SASL_PLAINTEXT");
|
||||
properties.put("sasl.mechanism", "PLAIN");
|
||||
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
|
||||
+ FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";");
|
||||
} else if (servers.contains(SSL_PORT)) {
|
||||
properties.put("security.protocol", "SSL");
|
||||
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
||||
properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
|
||||
properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
|
||||
properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
|
||||
properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
|
||||
properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
74
src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
Normal file
74
src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
Normal file
@@ -0,0 +1,74 @@
|
||||
package com.zdjizhi.utils.kafka;
|
||||
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
|
||||
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
|
||||
import org.apache.flink.api.common.typeutils.base.StringSerializer;
|
||||
import org.apache.flink.connector.kafka.source.KafkaSource;
|
||||
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
|
||||
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.kafka
|
||||
* @Description:
|
||||
* @date 2021/6/813:54
|
||||
*/
|
||||
public class KafkaConsumer {
|
||||
private static Properties createConsumerConfig() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
|
||||
properties.put("group.id", FlowWriteConfig.GROUP_ID);
|
||||
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
|
||||
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
|
||||
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
|
||||
properties.put("partition.discovery.interval.ms", "10000");
|
||||
CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* 用户序列化kafka数据,增加 kafka Timestamp内容。
|
||||
*
|
||||
* @return kafka logs -> map
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
|
||||
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
|
||||
new TimestampDeserializationSchema(), createConsumerConfig());
|
||||
|
||||
//随着checkpoint提交,将offset提交到kafka
|
||||
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
||||
|
||||
//从消费组当前的offset开始消费
|
||||
kafkaConsumer.setStartFromGroupOffsets();
|
||||
|
||||
return kafkaConsumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* 官方序列化kafka数据
|
||||
*
|
||||
* @return kafka logs
|
||||
*/
|
||||
public static FlinkKafkaConsumer<String> flinkConsumer() {
|
||||
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
|
||||
new SimpleStringSchema(), createConsumerConfig());
|
||||
|
||||
//随着checkpoint提交,将offset提交到kafka
|
||||
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
||||
|
||||
//从消费组当前的offset开始消费
|
||||
kafkaConsumer.setStartFromGroupOffsets();
|
||||
|
||||
return kafkaConsumer;
|
||||
}
|
||||
}
|
||||
82
src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
Normal file
82
src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
Normal file
@@ -0,0 +1,82 @@
|
||||
package com.zdjizhi.utils.kafka;
|
||||
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.kafka
|
||||
* @Description:
|
||||
* @date 2021/6/814:04
|
||||
*/
|
||||
public class KafkaProducer {
|
||||
|
||||
|
||||
private static Properties createPercentProducerConfig() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS);
|
||||
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
|
||||
properties.put("retries", FlowWriteConfig.RETRIES);
|
||||
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
|
||||
properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
|
||||
properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
|
||||
properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
|
||||
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
|
||||
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
|
||||
|
||||
CertUtils.chooseCert(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS, properties);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static Properties createTrafficFileMetaProducerConfig() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS);
|
||||
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
|
||||
properties.put("retries", FlowWriteConfig.RETRIES);
|
||||
properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
|
||||
properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
|
||||
properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
|
||||
properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
|
||||
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
|
||||
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
|
||||
|
||||
CertUtils.chooseCert(FlowWriteConfig.FILE_DATA_SINK_KAFKA_SERVERS, properties);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static FlinkKafkaProducer<String> getPercentKafkaProducer() {
|
||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
||||
FlowWriteConfig.PERCENT_KAFKA_TOPIC,
|
||||
new SimpleStringSchema(),
|
||||
createPercentProducerConfig(), Optional.empty());
|
||||
|
||||
kafkaProducer.setLogFailuresOnly(false);
|
||||
|
||||
// kafkaProducer.setWriteTimestampToKafka(true);
|
||||
|
||||
return kafkaProducer;
|
||||
}
|
||||
public static FlinkKafkaProducer<String> getTrafficFileMetaKafkaProducer() {
|
||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
||||
FlowWriteConfig.FILE_DATA_SINK_KAFKA_TOPIC,
|
||||
new SimpleStringSchema(),
|
||||
createTrafficFileMetaProducerConfig(), Optional.empty());
|
||||
|
||||
kafkaProducer.setLogFailuresOnly(false);
|
||||
|
||||
// kafkaProducer.setWriteTimestampToKafka(true);
|
||||
|
||||
return kafkaProducer;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.zdjizhi.utils.kafka;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @Package com.zdjizhi.utils.kafka
|
||||
* @Description:
|
||||
* @date 2022/3/89:42
|
||||
*/
|
||||
public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
@Override
|
||||
public TypeInformation getProducedType() {
|
||||
return TypeInformation.of(Map.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEndOfStream(Object nextElement) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
|
||||
if (record != null) {
|
||||
try {
|
||||
long timestamp = record.timestamp() / 1000;
|
||||
String value = new String((byte[]) record.value(), FlowWriteConfig.ENCODING);
|
||||
Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
|
||||
json.put("common_ingestion_time", timestamp);
|
||||
return json;
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user