1:增加kafka序列化类,用于获取日志写入kafka的时间戳。TSG-9844

2:删除kafka认证类型,通过连接端口判断。
3:删除强匹配模式;仅适用弱匹配和不匹配即可满足需求。
This commit is contained in:
qidaijie
2022-03-08 15:18:28 +08:00
parent 53b07555f2
commit daea0a07a1
15 changed files with 173 additions and 289 deletions

View File

@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
<version>220209-ipLookup</version>
<version>220308-IngestionTime</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>

View File

@@ -1,4 +1,4 @@
#====================Kafka Consumer====================#
#====================Kafka KafkaConsumer====================#
#kafka source connection timeout
session.timeout.ms=60000
@@ -7,7 +7,7 @@ max.poll.records=3000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
#====================Kafka Producer====================#
#====================Kafka KafkaProducer====================#
#producer重试的次数设置
retries=0
@@ -28,12 +28,6 @@ buffer.memory=134217728
#10M
max.request.size=10485760
#====================kafka default====================#
#kafka source protocol; SSL or SASL
kafka.source.protocol=SASL
#kafka sink protocol; SSL or SASL
kafka.sink.protocol=SASL
#kafka SASL验证用户名
kafka.user=admin
@@ -47,8 +41,8 @@ hbase.table.name=tsg_galaxy:relation_framedip_account
#邮件默认编码
mail.default.charset=UTF-8
#0不做任何校验1强类型校验2弱类型校验
log.transform.type=2
#0不做任何校验1弱类型校验
log.transform.type=0
#两个输出之间的最大时间(单位milliseconds)
buffer.timeout=5000

View File

@@ -1,23 +1,23 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.11:9094
source.kafka.servers=192.168.44.12:9094
#管理输出kafka地址
sink.kafka.servers=192.168.44.11:9094
sink.kafka.servers=192.168.44.12:9094
#zookeeper 地址 用于配置log_id
zookeeper.servers=192.168.44.11:2181
zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.11:2181
hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
tools.library=D:\\workerspace\\dat\\
#网关的schema位置
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/session_record
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record
#网关APP_ID 获取接口
app.id.http=http://192.168.44.67:9999/open-api/appDicList
@@ -31,7 +31,7 @@ source.kafka.topic=test
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=flink-test-1
group.id=flinktest-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none

View File

@@ -52,8 +52,6 @@ public class FlowWriteConfig {
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");

View File

@@ -5,14 +5,15 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
import com.zdjizhi.utils.functions.ObjectCompletedFunction;
import com.zdjizhi.utils.functions.TypeMapCompletedFunction;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.topology
@@ -25,56 +26,48 @@ public class LogFlowWriteTopology {
public static void main(String[] args) {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//开启Checkpointinterval用于指定checkpoint的触发间隔(单位milliseconds)
// environment.enableCheckpointing(5000);
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
DataStreamSource<Map<String, Object>> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
DataStream<String> cleaningLog;
switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
case 0:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 1:
//对原始日志进行处理补全转换等强制要求日志字段类型与schema一致。
cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 2:
//对原始日志进行处理补全转换等对日志字段类型做若校验可根据schema进行强转。
cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
default:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
// //过滤空数据不发送到Kafka内
//过滤空数据不发送到Kafka内
DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
} else {
DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.flinkConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
//过滤空数据不发送到Kafka内
DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
}

View File

@@ -4,6 +4,8 @@ package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
@@ -11,11 +13,11 @@ import org.apache.flink.api.common.functions.MapFunction;
* @Description:
* @date 2021/5/2715:01
*/
public class MapCompletedFunction implements MapFunction<String, String> {
public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
public String map(String logs) {
public String map(Map<String, Object> logs) {
return TransFormMap.dealCommonMessage(logs);
}
}

View File

@@ -1,20 +0,0 @@
package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormObject;
import org.apache.flink.api.common.functions.MapFunction;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/5/2715:01
*/
public class ObjectCompletedFunction implements MapFunction<String, String> {
@Override
@SuppressWarnings("unchecked")
public String map(String logs) {
return TransFormObject.dealCommonMessage(logs);
}
}

View File

@@ -3,6 +3,8 @@ package com.zdjizhi.utils.functions;
import com.zdjizhi.utils.general.TransFormTypeMap;
import org.apache.flink.api.common.functions.MapFunction;
import java.util.Map;
/**
* @author qidaijie
@@ -10,11 +12,11 @@ import org.apache.flink.api.common.functions.MapFunction;
* @Description:
* @date 2021/5/2715:01
*/
public class TypeMapCompletedFunction implements MapFunction<String, String> {
public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
@Override
@SuppressWarnings("unchecked")
public String map(String logs) {
public String map(Map<String, Object> logs) {
return TransFormTypeMap.dealCommonMessage(logs);
}

View File

@@ -30,34 +30,29 @@ public class TransFormMap {
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @param jsonMap kafka Topic消费原始日志并解析
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(String message) {
public static String dealCommonMessage(Map<String, Object> jsonMap) {
try {
if (StringUtil.isNotBlank(message)) {
Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : jobList) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} else {
return null;
JsonParseUtil.dropJsonField(jsonMap);
for (String[] strings : jobList) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的
Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
logger.error("TransForm logs failed,The exception is :" + e.getMessage());
return null;
}
}

View File

@@ -1,153 +0,0 @@
package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.util.ArrayList;
import java.util.HashMap;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
*/
public class TransFormObject {
private static final Log logger = LogFactory.get();
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 反射成一个类
*/
private static Object mapObject = JsonParseUtil.generateObject(map);
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
public static String dealCommonMessage(String message) {
try {
if (StringUtil.isNotBlank(message)) {
Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, object, appendToKeyName, appendTo, name, param);
}
return JsonMapper.toJsonString(object);
} else {
return null;
}
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
return null;
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param object 动态POJO Object
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param name 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
if (!(appendTo instanceof Long)) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
}
break;
case "snowflake_id":
JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
}
break;
case "geo_asn":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
}
break;
case "geo_ip_country":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
}
break;
case "set_value":
if (name != null && param != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param));
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param));
}
break;
case "sub_domain":
if (appendTo == null && name != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString()));
}
break;
case "radius_match":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
}
break;
case "flattenSpec":
if (name != null && param != null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
}
break;
case "app_match":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
default:
}
}
}

View File

@@ -39,31 +39,25 @@ public class TransFormTypeMap {
* @return 补全后的日志
*/
@SuppressWarnings("unchecked")
public static String dealCommonMessage(String message) {
public static String dealCommonMessage(Map<String, Object> message) {
try {
if (StringUtil.isNotBlank(message)) {
Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
Map<String, Object> jsonMap = JsonTypeUtils.typeTransform(map);
for (String[] strings : jobList) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} else {
return null;
Map<String, Object> jsonMap = JsonTypeUtils.typeTransform(message);
for (String[] strings : jobList) {
//用到的参数的值
Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的
Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
}
return JsonMapper.toJsonString(jsonMap);
} catch (RuntimeException e) {
logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
logger.error("TransForm logs failed,The exception is :" + e.getMessage());
return null;
}
}

View File

@@ -12,24 +12,36 @@ import java.util.Properties;
* @date 2021/9/610:37
*/
class CertUtils {
static void chooseCert(String type, Properties properties) {
switch (type) {
case "SSL":
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
break;
case "SASL":
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
break;
default:
/**
* Kafka SASL认证端口
*/
private static final String SASL_PORT = "9094";
/**
* Kafka SSL认证端口
*/
private static final String SSL_PORT = "9095";
/**
* 根据连接信息端口判断认证方式。
*
* @param servers kafka 连接信息
* @param properties kafka 连接配置信息
*/
static void chooseCert(String servers, Properties properties) {
if (servers.contains(SASL_PORT)) {
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
} else if (servers.contains(SSL_PORT)) {
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
}
}

View File

@@ -1,14 +1,14 @@
package com.zdjizhi.utils.kafka;
import com.sun.tools.javac.comp.Flow;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Map;
import java.util.Properties;
/**
@@ -17,7 +17,7 @@ import java.util.Properties;
* @Description:
* @date 2021/6/813:54
*/
public class Consumer {
public class KafkaConsumer {
private static Properties createConsumerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
@@ -27,12 +27,33 @@ public class Consumer {
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
return properties;
}
public static FlinkKafkaConsumer<String> getKafkaConsumer() {
/**
* 用户序列化kafka数据增加 kafka Timestamp内容
*
* @return kafka logs -> map
*/
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new TimestampDeserializationSchema(), createConsumerConfig());
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
}
/**
* 官方序列化kafka数据
*
* @return kafka logs
*/
public static FlinkKafkaConsumer<String> flinkConsumer() {
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new SimpleStringSchema(), createConsumerConfig());

View File

@@ -15,7 +15,7 @@ import java.util.Properties;
* @Description:
* @date 2021/6/814:04
*/
public class Producer {
public class KafkaProducer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
@@ -29,7 +29,7 @@ public class Producer {
properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties);
return properties;
}
@@ -43,7 +43,6 @@ public class Producer {
kafkaProducer.setLogFailuresOnly(false);
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;

View File

@@ -0,0 +1,47 @@
package com.zdjizhi.utils.kafka;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.JsonMapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Map;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.kafka
* @Description:
* @date 2022/3/89:42
*/
public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
private static final Log logger = LogFactory.get();
private final String ENCODING = "UTF8";
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
@Override
public Map<String, Object> deserialize(ConsumerRecord record) throws Exception {
if (record != null) {
try {
long timestamp = record.timestamp() / 1000;
String value = new String((byte[]) record.value(), ENCODING);
Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
json.put("common_ingestion_time", timestamp);
return json;
} catch (RuntimeException e) {
logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
}
}
return null;
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Map.class);
}
}