diff --git a/pom.xml b/pom.xml index 42189ea..feb29c6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 211105-flattenSpec + 211109-Jackson log-completion-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index fb9015e..01bb5ca 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,23 +1,4 @@ -#producer重试的次数设置 -retries=0 - -#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 -linger.ms=10 - -#如果在超时之前未收到响应,客户端将在必要时重新发送请求 -request.timeout.ms=30000 - -#producer都是按照batch进行发送的,批次大小,默认:16384 -batch.size=262144 - -#Producer端用于缓存消息的缓冲区大小 -#128M -buffer.memory=134217728 - -#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 -#10M -max.request.size=10485760 - +#====================Kafka Consumer====================# #kafka source connection timeout session.timeout.ms=60000 @@ -26,24 +7,48 @@ max.poll.records=3000 #kafka source poll bytes max.partition.fetch.bytes=31457280 +#====================Kafka Producer====================# +#producer閲嶈瘯鐨勬鏁拌缃 +retries=0 -#hbase table name -hbase.table.name=subscriber_info +#浠栫殑鍚箟灏辨槸璇翠竴涓狟atch琚垱寤轰箣鍚庯紝鏈澶氳繃澶氫箙锛屼笉绠¤繖涓狟atch鏈夋病鏈夊啓婊★紝閮藉繀椤诲彂閫佸嚭鍘讳簡 +linger.ms=10 -#邮件默认编码 -mail.default.charset=UTF-8 +#濡傛灉鍦ㄨ秴鏃朵箣鍓嶆湭鏀跺埌鍝嶅簲锛屽鎴风灏嗗湪蹇呰鏃堕噸鏂板彂閫佽姹 +request.timeout.ms=30000 -#0不做任何校验,1强类型校验,2弱类型校验 -log.transform.type=0 +#producer閮芥槸鎸夌収batch杩涜鍙戦佺殑,鎵规澶у皬锛岄粯璁:16384 +batch.size=262144 +#Producer绔敤浜庣紦瀛樻秷鎭殑缂撳啿鍖哄ぇ灏 +#128M +buffer.memory=134217728 + +#杩欎釜鍙傛暟鍐冲畾浜嗘瘡娆″彂閫佺粰Kafka鏈嶅姟鍣ㄨ姹傜殑鏈澶уぇ灏,榛樿1048576 +#10M +max.request.size=10485760 +#====================kafka default====================# #kafka source protocol; SSL or SASL kafka.source.protocol=SASL #kafka sink protocol; SSL or SASL kafka.sink.protocol=SSL -#kafka SASL验证用户名 +#kafka SASL楠岃瘉鐢ㄦ埛鍚 kafka.user=admin -#kafka SASL及SSL验证密码 -kafka.pin=galaxy2019 \ No newline at end of file +#kafka SASL鍙奡SL楠岃瘉瀵嗙爜 +kafka.pin=galaxy2019 +#====================Topology Default====================# + +#hbase table name +hbase.table.name=subscriber_info + +#閭欢榛樿缂栫爜 +mail.default.charset=UTF-8 + +#0涓嶅仛浠讳綍鏍¢獙锛1寮虹被鍨嬫牎楠岋紝2寮辩被鍨嬫牎楠 +log.transform.type=2 + +#涓や釜杈撳嚭涔嬮棿鐨勬渶澶ф椂闂(鍗曚綅milliseconds) +buffer.timeout=100000 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 51ecb4d..1a38ca4 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------鍦板潃閰嶇疆------------------------------# #绠$悊kafka鍦板潃 -source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 +source.kafka.servers=10.231.12.4:9094 #绠$悊杈撳嚭kafka鍦板潃 sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 @@ -10,7 +10,7 @@ sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224. zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase -hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 +hbase.zookeeper.servers=10.231.12.4:2181 #--------------------------------HTTP/瀹氫綅搴------------------------------# #瀹氫綅搴撳湴鍧 @@ -25,13 +25,13 @@ app.id.http=http://10.224.11.244:9999/open-api/appDicList #--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# #kafka 鎺ユ敹鏁版嵁topic -source.kafka.topic=test +source.kafka.topic=SESSION-RECORD #琛ュ叏鏁版嵁 杈撳嚭 topic sink.kafka.topic=test-result #璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 -group.id=flink-test +group.id=flink-test-1 #鐢熶骇鑰呭帇缂╂ā寮 none or snappy producer.kafka.compression.type=none diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index e8c569c..e2d430a 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -29,6 +29,7 @@ public class FlowWriteConfig { public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); + public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); /** * kafka source config diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index b23492b..07e0407 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -28,8 +28,8 @@ public class LogFlowWriteTopology { //寮鍚疌heckpoint锛宨nterval鐢ㄤ簬鎸囧畾checkpoint鐨勮Е鍙戦棿闅(鍗曚綅milliseconds) // environment.enableCheckpointing(5000); - // - environment.setBufferTimeout(5000); + //涓や釜杈撳嚭涔嬮棿鐨勬渶澶ф椂闂 (鍗曚綅milliseconds) + environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); @@ -41,26 +41,31 @@ public class LogFlowWriteTopology { //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屼笉瀵规棩蹇楀瓧娈电被鍨嬪仛鏍¢獙銆 cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; case 1: //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屽己鍒惰姹傛棩蹇楀瓧娈电被鍨嬩笌schema涓鑷淬 cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; case 2: //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屽鏃ュ織瀛楁绫诲瀷鍋氳嫢鏍¢獙锛屽彲鏍规嵁schema杩涜寮鸿浆銆 cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; default: //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屼笉瀵规棩蹇楀瓧娈电被鍨嬪仛鏍¢獙銆 cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + } - //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐 +// //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐 DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + //鍙戦佹暟鎹埌Kafka result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(FlowWriteConfig.SINK_PARALLELISM); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index 239d8db..5ae9859 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -54,11 +54,11 @@ public class TransFormMap { } return JsonMapper.toJsonString(jsonMap); } else { - return ""; + return null; } } catch (RuntimeException e) { logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); - return ""; + return null; } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java index 9b776a9..54629db 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java @@ -62,11 +62,11 @@ public class TransFormObject { } return JsonMapper.toJsonString(object); } else { - return ""; + return null; } } catch (RuntimeException e) { logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); - return ""; + return null; } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 4423f51..5f2100b 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -3,6 +3,8 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; @@ -12,6 +14,8 @@ import com.zdjizhi.utils.json.JsonTypeUtils; import java.util.ArrayList; import java.util.Map; +import static com.alibaba.fastjson.serializer.SerializerFeature.WriteMapNullValue; + /** * 鎻忚堪:杞崲鎴栬ˉ鍏ㄥ伐鍏风被 @@ -53,13 +57,19 @@ public class TransFormTypeMap { String param = strings[3]; functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); } - return JsonMapper.toJsonString(jsonMap); +// return JsonMapper.toJsonString(jsonMap); + + //fastjson test + return JSONObject.toJSONString(jsonMap, + SerializerFeature.DisableCircularReferenceDetect + ,SerializerFeature.WriteNullStringAsEmpty + ,SerializerFeature.WriteNullNumberAsZero); } else { - return ""; + return null; } } catch (RuntimeException e) { logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + message); - return ""; + return null; } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java index 6c495f7..339b7e3 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -5,6 +5,7 @@ import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SslConfigs; @@ -26,7 +27,6 @@ public class Consumer { properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties); return properties; diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java index f9bee25..1671643 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -43,6 +43,7 @@ public class Producer { kafkaProducer.setLogFailuresOnly(false); + // kafkaProducer.setWriteTimestampToKafka(true); return kafkaProducer;