diff --git a/pom.xml b/pom.xml index 217901d..69d563f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 220209-ipLookup + 220308-IngestionTime log-completion-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index ebf7927..6a01de4 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,4 +1,4 @@ -#====================Kafka Consumer====================# +#====================Kafka KafkaConsumer====================# #kafka source connection timeout session.timeout.ms=60000 @@ -7,7 +7,7 @@ max.poll.records=3000 #kafka source poll bytes max.partition.fetch.bytes=31457280 -#====================Kafka Producer====================# +#====================Kafka KafkaProducer====================# #producer重试的次数设置 retries=0 @@ -28,12 +28,6 @@ buffer.memory=134217728 #10M max.request.size=10485760 #====================kafka default====================# -#kafka source protocol; SSL or SASL -kafka.source.protocol=SASL - -#kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL - #kafka SASL验证用户名 kafka.user=admin @@ -47,8 +41,8 @@ hbase.table.name=tsg_galaxy:relation_framedip_account #邮件默认编码 mail.default.charset=UTF-8 -#0不做任何校验,1强类型校验,2弱类型校验 -log.transform.type=2 +#0不做任何校验,1弱类型校验 +log.transform.type=0 #两个输出之间的最大时间(单位milliseconds) buffer.timeout=5000 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index ddd10f6..df12fa7 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,23 +1,23 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.11:9094 +source.kafka.servers=192.168.44.12:9094 #管理输出kafka地址 -sink.kafka.servers=192.168.44.11:9094 +sink.kafka.servers=192.168.44.12:9094 #zookeeper 地址 用于配置log_id -zookeeper.servers=192.168.44.11:2181 +zookeeper.servers=192.168.44.12:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=192.168.44.11:2181 +hbase.zookeeper.servers=192.168.44.12:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 tools.library=D:\\workerspace\\dat\\ #网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/session_record +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record #网关APP_ID 获取接口 app.id.http=http://192.168.44.67:9999/open-api/appDicList @@ -31,7 +31,7 @@ source.kafka.topic=test sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=flink-test-1 +group.id=flinktest-1 #生产者压缩模式 none or snappy producer.kafka.compression.type=none diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index e2d430a..ebc8eeb 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -52,8 +52,6 @@ public class FlowWriteConfig { public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack"); public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); - public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol"); - public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol"); public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin"); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 07e0407..2d42769 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -5,14 +5,15 @@ import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.functions.FilterNullFunction; import com.zdjizhi.utils.functions.MapCompletedFunction; -import com.zdjizhi.utils.functions.ObjectCompletedFunction; import com.zdjizhi.utils.functions.TypeMapCompletedFunction; -import com.zdjizhi.utils.kafka.Consumer; -import com.zdjizhi.utils.kafka.Producer; +import com.zdjizhi.utils.kafka.KafkaConsumer; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.util.Map; + /** * @author qidaijie * @Package com.zdjizhi.topology @@ -25,56 +26,48 @@ public class LogFlowWriteTopology { public static void main(String[] args) { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - //开启Checkpoint,interval用于指定checkpoint的触发间隔(单位milliseconds) -// environment.enableCheckpointing(5000); - //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); - DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); - if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + DataStreamSource> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer()) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); + DataStream cleaningLog; switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) { case 0: //对原始日志进行处理补全转换等,不对日志字段类型做校验。 cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - break; case 1: - //对原始日志进行处理补全转换等,强制要求日志字段类型与schema一致。 - cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction") - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - - break; - case 2: //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。 cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - break; default: //对原始日志进行处理补全转换等,不对日志字段类型做校验。 cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - } -// //过滤空数据不发送到Kafka内 + //过滤空数据不发送到Kafka内 DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //发送数据到Kafka - result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") + result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } else { + DataStreamSource streamSource = environment.addSource(KafkaConsumer.flinkConsumer()) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); + //过滤空数据不发送到Kafka内 DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + //发送数据到Kafka - result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") + result.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index 5e5d0b7..810e4c8 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -4,6 +4,8 @@ package com.zdjizhi.utils.functions; import com.zdjizhi.utils.general.TransFormMap; import org.apache.flink.api.common.functions.MapFunction; +import java.util.Map; + /** * @author qidaijie @@ -11,11 +13,11 @@ import org.apache.flink.api.common.functions.MapFunction; * @Description: * @date 2021/5/2715:01 */ -public class MapCompletedFunction implements MapFunction { +public class MapCompletedFunction implements MapFunction, String> { @Override @SuppressWarnings("unchecked") - public String map(String logs) { + public String map(Map logs) { return TransFormMap.dealCommonMessage(logs); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java deleted file mode 100644 index 131d2f6..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.utils.general.TransFormObject; -import org.apache.flink.api.common.functions.MapFunction; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class ObjectCompletedFunction implements MapFunction { - - @Override - @SuppressWarnings("unchecked") - public String map(String logs) { - return TransFormObject.dealCommonMessage(logs); - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java index 99c92e8..ccef850 100644 --- a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java @@ -3,6 +3,8 @@ package com.zdjizhi.utils.functions; import com.zdjizhi.utils.general.TransFormTypeMap; import org.apache.flink.api.common.functions.MapFunction; +import java.util.Map; + /** * @author qidaijie @@ -10,11 +12,11 @@ import org.apache.flink.api.common.functions.MapFunction; * @Description: * @date 2021/5/2715:01 */ -public class TypeMapCompletedFunction implements MapFunction { +public class TypeMapCompletedFunction implements MapFunction, String> { @Override @SuppressWarnings("unchecked") - public String map(String logs) { + public String map(Map logs) { return TransFormTypeMap.dealCommonMessage(logs); } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index 5ae9859..de4ca99 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -30,34 +30,29 @@ public class TransFormMap { /** * 解析日志,并补全 * - * @param message kafka Topic原始日志 + * @param jsonMap kafka Topic消费原始日志并解析 * @return 补全后的日志 */ @SuppressWarnings("unchecked") - public static String dealCommonMessage(String message) { + public static String dealCommonMessage(Map jsonMap) { try { - if (StringUtil.isNotBlank(message)) { - Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); - JsonParseUtil.dropJsonField(jsonMap); - for (String[] strings : jobList) { - //用到的参数的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - } else { - return null; + JsonParseUtil.dropJsonField(jsonMap); + for (String[] strings : jobList) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); } + return JsonMapper.toJsonString(jsonMap); } catch (RuntimeException e) { - logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + logger.error("TransForm logs failed,The exception is :" + e.getMessage()); return null; } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java deleted file mode 100644 index 54629db..0000000 --- a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java +++ /dev/null @@ -1,153 +0,0 @@ -package com.zdjizhi.utils.general; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.util.ArrayList; -import java.util.HashMap; - - -/** - * 描述:转换或补全工具类 - * - * @author qidaijie - */ -public class TransFormObject { - private static final Log logger = LogFactory.get(); - - /** - * 在内存中加载反射类用的map - */ - private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 反射成一个类 - */ - private static Object mapObject = JsonParseUtil.generateObject(map); - - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 解析日志,并补全 - * - * @param message kafka Topic原始日志 - * @return 补全后的日志 - */ - public static String dealCommonMessage(String message) { - try { - if (StringUtil.isNotBlank(message)) { - Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); - for (String[] strings : jobList) { - //用到的参数的值 - Object name = JsonParseUtil.getValue(object, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, object, appendToKeyName, appendTo, name, param); - } - return JsonMapper.toJsonString(object); - } else { - return null; - } - } catch (RuntimeException e) { - logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); - return null; - } - } - - - /** - * 根据schema描述对应字段进行操作的 函数集合 - * - * @param function 匹配操作函数的字段 - * @param object 动态POJO Object - * @param appendToKeyName 需要补全的字段的key - * @param appendTo 需要补全的字段的值 - * @param name 用到的参数的值 - * @param param 额外的参数的值 - */ - private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { - switch (function) { - case "current_timestamp": - if (!(appendTo instanceof Long)) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); - } - break; - case "snowflake_id": - JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); - break; - case "geo_ip_detail": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); - } - break; - case "geo_asn": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); - } - break; - case "geo_ip_country": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); - } - break; - case "set_value": - if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); - } - break; - case "get_value": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, name); - } - break; - case "if": - if (param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); - } - break; - case "sub_domain": - if (appendTo == null && name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); - } - break; - case "radius_match": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); - } - break; - case "decode_of_base64": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); - } - break; - case "flattenSpec": - if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); - } - break; - case "app_match": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); - } - break; - default: - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 699470f..5b3cede 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -39,31 +39,25 @@ public class TransFormTypeMap { * @return 补全后的日志 */ @SuppressWarnings("unchecked") - public static String dealCommonMessage(String message) { + public static String dealCommonMessage(Map message) { try { - if (StringUtil.isNotBlank(message)) { - Map map = (Map) JsonMapper.fromJsonString(message, Map.class); - Map jsonMap = JsonTypeUtils.typeTransform(map); - for (String[] strings : jobList) { - //用到的参数的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - - } else { - return null; + Map jsonMap = JsonTypeUtils.typeTransform(message); + for (String[] strings : jobList) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); } + return JsonMapper.toJsonString(jsonMap); } catch (RuntimeException e) { - logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + logger.error("TransForm logs failed,The exception is :" + e.getMessage()); return null; } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java index b09eedb..fe86fe7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -12,24 +12,36 @@ import java.util.Properties; * @date 2021/9/610:37 */ class CertUtils { - static void chooseCert(String type, Properties properties) { - switch (type) { - case "SSL": - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN); - properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN); - properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN); - break; - case "SASL": - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";"); - break; - default: + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN); + properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN); } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java similarity index 55% rename from src/main/java/com/zdjizhi/utils/kafka/Consumer.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 339b7e3..078c2fe 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -1,14 +1,14 @@ package com.zdjizhi.utils.kafka; -import com.sun.tools.javac.comp.Flow; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SslConfigs; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; +import java.util.Map; import java.util.Properties; /** @@ -17,7 +17,7 @@ import java.util.Properties; * @Description: * @date 2021/6/813:54 */ -public class Consumer { +public class KafkaConsumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); @@ -27,12 +27,33 @@ public class Consumer { properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties); + properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10"); + CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); return properties; } - public static FlinkKafkaConsumer getKafkaConsumer() { + /** + * 用户序列化kafka数据,增加 kafka Timestamp内容。 + * + * @return kafka logs -> map + */ + public static FlinkKafkaConsumer> myDeserializationConsumer() { + FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, + new TimestampDeserializationSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(false); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } + + /** + * 官方序列化kafka数据 + * + * @return kafka logs + */ + public static FlinkKafkaConsumer flinkConsumer() { FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java similarity index 94% rename from src/main/java/com/zdjizhi/utils/kafka/Producer.java rename to src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index 1671643..f2f399d 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -15,7 +15,7 @@ import java.util.Properties; * @Description: * @date 2021/6/814:04 */ -public class Producer { +public class KafkaProducer { private static Properties createProducerConfig() { Properties properties = new Properties(); @@ -29,7 +29,7 @@ public class Producer { properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE); properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties); + CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties); return properties; } @@ -43,7 +43,6 @@ public class Producer { kafkaProducer.setLogFailuresOnly(false); - // kafkaProducer.setWriteTimestampToKafka(true); return kafkaProducer; diff --git a/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java new file mode 100644 index 0000000..e978369 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/TimestampDeserializationSchema.java @@ -0,0 +1,47 @@ +package com.zdjizhi.utils.kafka; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2022/3/89:42 + */ +public class TimestampDeserializationSchema implements KafkaDeserializationSchema { + private static final Log logger = LogFactory.get(); + private final String ENCODING = "UTF8"; + + @Override + public boolean isEndOfStream(Object nextElement) { + return false; + } + + @Override + public Map deserialize(ConsumerRecord record) throws Exception { + if (record != null) { + try { + long timestamp = record.timestamp() / 1000; + String value = new String((byte[]) record.value(), ENCODING); + Map json = (Map) JsonMapper.fromJsonString(value, Map.class); + json.put("common_ingestion_time", timestamp); + return json; + } catch (RuntimeException e) { + logger.error("KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); + } + } + return null; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Map.class); + } +}