diff --git a/pom.xml b/pom.xml index 6a833fb..42189ea 100644 --- a/pom.xml +++ b/pom.xml @@ -21,13 +21,13 @@ maven-ali http://maven.aliyun.com/nexus/content/groups/public/ - + - - + + - fail - + + diff --git a/properties/default_config.properties b/properties/default_config.properties index c11eeb7..fb9015e 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -11,23 +11,13 @@ request.timeout.ms=30000 batch.size=262144 #Producer端用于缓存消息的缓冲区大小 -#64M -#buffer.memory=67108864 #128M buffer.memory=134217728 #这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 -#5M -#max.request.size=5242880 #10M max.request.size=10485760 -#kafka SASL验证用户名 -kafka.user=admin - -#kafka SASL及SSL验证密码 -kafka.pin=galaxy2019 - #kafka source connection timeout session.timeout.ms=60000 @@ -43,8 +33,17 @@ hbase.table.name=subscriber_info #邮件默认编码 mail.default.charset=UTF-8 +#0不做任何校验,1强类型校验,2弱类型校验 +log.transform.type=0 + #kafka source protocol; SSL or SASL kafka.source.protocol=SASL #kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL \ No newline at end of file +kafka.sink.protocol=SSL + +#kafka SASL验证用户名 +kafka.user=admin + +#kafka SASL及SSL验证密码 +kafka.pin=galaxy2019 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index a0c7fc2..51ecb4d 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,39 +1,37 @@ #--------------------------------鍦板潃閰嶇疆------------------------------# #绠$悊kafka鍦板潃 -input.kafka.servers=192.168.44.12:9094 +source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 #绠$悊杈撳嚭kafka鍦板潃 -output.kafka.servers=192.168.44.12:9094 +sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 #zookeeper 鍦板潃 鐢ㄤ簬閰嶇疆log_id -zookeeper.servers=192.168.44.12:2181 +zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #hbase zookeeper鍦板潃 鐢ㄤ簬杩炴帴HBase -hbase.zookeeper.servers=192.168.44.12:2181 +hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #--------------------------------HTTP/瀹氫綅搴------------------------------# #瀹氫綅搴撳湴鍧 -tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +tools.library=D:\\workerspace\\dat\\ #缃戝叧鐨剆chema浣嶇疆 -schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record +schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/session_record #缃戝叧APP_ID 鑾峰彇鎺ュ彛 -app.id.http=http://192.168.44.12:9999/open-api/appDicList +app.id.http=http://10.224.11.244:9999/open-api/appDicList #--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# #kafka 鎺ユ敹鏁版嵁topic -#input.kafka.topic=SESSION-RECORD -input.kafka.topic=test +source.kafka.topic=test #琛ュ叏鏁版嵁 杈撳嚭 topic -#output.kafka.topic=SESSION-RECORD-COMPLETED -output.kafka.topic=test-result +sink.kafka.topic=test-result #璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 -group.id=session-record-log-20210902-1 +group.id=flink-test #鐢熶骇鑰呭帇缂╂ā寮 none or snappy producer.kafka.compression.type=none @@ -44,13 +42,16 @@ producer.ack=1 #--------------------------------topology閰嶇疆------------------------------# #consumer 骞惰搴 -consumer.parallelism=1 +source.parallelism=10 #杞崲鍑芥暟骞惰搴 -transform.parallelism=1 +transform.parallelism=10 + +#kafka producer 骞惰搴 +sink.parallelism=10 #鏁版嵁涓績锛屽彇鍊艰寖鍥(0-63) -data.center.id.num=0 +data.center.id.num=7 #hbase 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 hbase.tick.tuple.freq.secs=180 @@ -64,4 +65,4 @@ app.tick.tuple.freq.secs=0 mail.default.charset=UTF-8 #0涓嶉渶瑕佽ˉ鍏ㄥ師鏍疯緭鍑烘棩蹇楋紝1闇瑕佽ˉ鍏 -log.need.complete=1 +log.need.complete=1 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index aa3c757..e8c569c 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -19,7 +19,8 @@ public class FlowWriteConfig { /** * System config */ - public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); + public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism"); public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); @@ -27,6 +28,7 @@ public class FlowWriteConfig { public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); /** * kafka source config @@ -39,13 +41,13 @@ public class FlowWriteConfig { /** * kafka sink config */ - public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); - public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic"); - public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic"); + public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack"); public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 5c89522..b23492b 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -5,9 +5,10 @@ import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.functions.FilterNullFunction; import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.functions.ObjectCompletedFunction; +import com.zdjizhi.utils.functions.TypeMapCompletedFunction; import com.zdjizhi.utils.kafka.Consumer; import com.zdjizhi.utils.kafka.Producer; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -27,26 +28,49 @@ public class LogFlowWriteTopology { //寮鍚疌heckpoint锛宨nterval鐢ㄤ簬鎸囧畾checkpoint鐨勮Е鍙戦棿闅(鍗曚綅milliseconds) // environment.enableCheckpointing(5000); + // + environment.setBufferTimeout(5000); + DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()) - .setParallelism(FlowWriteConfig.CONSUMER_PARALLELISM); + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { - //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑 - DataStream cleaningLog = streamSource.map(new MapCompletedFunction()).name("TransFormLogs") - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + DataStream cleaningLog; + switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) { + case 0: + //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屼笉瀵规棩蹇楀瓧娈电被鍨嬪仛鏍¢獙銆 + cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; + case 1: + //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屽己鍒惰姹傛棩蹇楀瓧娈电被鍨嬩笌schema涓鑷淬 + cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; + case 2: + //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屽鏃ュ織瀛楁绫诲瀷鍋氳嫢鏍¢獙锛屽彲鏍规嵁schema杩涜寮鸿浆銆 + cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + break; + default: + //瀵瑰師濮嬫棩蹇楄繘琛屽鐞嗚ˉ鍏ㄨ浆鎹㈢瓑锛屼笉瀵规棩蹇楀瓧娈电被鍨嬪仛鏍¢獙銆 + cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + } + //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐 DataStream result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //鍙戦佹暟鎹埌Kafka result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } else { //杩囨护绌烘暟鎹笉鍙戦佸埌Kafka鍐 DataStream result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //鍙戦佹暟鎹埌Kafka result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); + .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } try { diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index 5618159..5e5d0b7 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -1,8 +1,7 @@ package com.zdjizhi.utils.functions; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.general.TransFormTypeMap; + +import com.zdjizhi.utils.general.TransFormMap; import org.apache.flink.api.common.functions.MapFunction; @@ -13,16 +12,10 @@ import org.apache.flink.api.common.functions.MapFunction; * @date 2021/5/2715:01 */ public class MapCompletedFunction implements MapFunction { - private static final Log logger = LogFactory.get(); @Override @SuppressWarnings("unchecked") public String map(String logs) { - try { - return TransFormTypeMap.dealCommonMessage(logs); - } catch (RuntimeException e) { - logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e + "\n" + logs); - return ""; - } + return TransFormMap.dealCommonMessage(logs); } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index f67b842..239d8db 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -38,6 +38,7 @@ public class TransFormMap { try { if (StringUtil.isNotBlank(message)) { Map jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); + JsonParseUtil.dropJsonField(jsonMap); for (String[] strings : jobList) { //鐢ㄥ埌鐨勫弬鏁扮殑鍊 Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); @@ -122,11 +123,6 @@ public class TransFormMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); } break; - case "app_match": - if (logValue != null && appendTo == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); - } - break; case "decode_of_base64": if (logValue != null) { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); @@ -137,6 +133,11 @@ public class TransFormMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); } break; + case "app_match": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; default: } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java index 26795b0..9b776a9 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java @@ -131,11 +131,6 @@ public class TransFormObject { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); } break; - case "app_match": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); - } - break; case "decode_of_base64": if (name != null) { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); @@ -146,6 +141,11 @@ public class TransFormObject { JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); } break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; default: } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index f02abc0..549f3cc 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -218,11 +218,9 @@ class TransFunction { Object resultA = isJsonValue(object, split[1]); Object resultB = isJsonValue(object, split[2]); if (direction instanceof Number) { -// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); } else if (direction instanceof String) { result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); -// result = direction.equals(norms[1]) ? resultA : resultB; } } } catch (RuntimeException e) { @@ -249,9 +247,7 @@ class TransFunction { Object resultB = isJsonValue(jsonMap, split[2]); if (direction instanceof Number) { result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; -// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); } else if (direction instanceof String) { -// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); result = direction.equals(norms[1]) ? resultA : resultB; } } @@ -261,36 +257,6 @@ class TransFunction { return result; } -// /** -// * IF鍑芥暟瀹炵幇锛岃В鏋愭棩蹇楁瀯寤轰笁鐩繍绠;鍖呭惈鍒ゆ柇鏄惁涓烘暟瀛楄嫢涓烘暟瀛楀垯杞崲涓簂ong绫诲瀷杩斿洖缁撴灉銆 -// * -// * @param jsonMap 鍘熷鏃ュ織 -// * @param ifParam 瀛楁鍚/鏅氬瓧绗︿覆 -// * @return resultA or resultB or null -// */ -// static Object condition(Map jsonMap, String ifParam) { -// try { -// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); -// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); -// String direction = isJsonValue(jsonMap, norms[0]); -// if (StringUtil.isNotBlank(direction)) { -// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { -// String resultA = isJsonValue(jsonMap, split[1]); -// String resultB = isJsonValue(jsonMap, split[2]); -// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; -// Matcher isNum = PATTERN.matcher(result); -// if (isNum.matches()) { -// return Long.parseLong(result); -// } else { -// return result; -// } -// } -// } -// } catch (RuntimeException e) { -// logger.error("IF 鍑芥暟鎵ц寮傚父,寮傚父淇℃伅:" + e); -// } -// return null; -// } /** * 璁剧疆鍥哄畾鍊煎嚱鏁 鑻ヤ负鏁板瓧鍒欒浆涓簂ong杩斿洖 diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index bdcc43d..0aacc78 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -206,7 +206,7 @@ public class JsonParseUtil { return isKeepField; } - static void dropJsonField(Map jsonMap) { + public static void dropJsonField(Map jsonMap) { for (String field : dropList) { jsonMap.remove(field); } diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java index 1036fe9..6c495f7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.kafka; +import com.sun.tools.javac.comp.Flow; import com.zdjizhi.common.FlowWriteConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -18,11 +19,11 @@ import java.util.Properties; public class Consumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", FlowWriteConfig.GROUP_ID); - properties.put("session.timeout.ms", "60000"); - properties.put("max.poll.records", "3000"); - properties.put("max.partition.fetch.bytes", "31457280"); + properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); @@ -32,7 +33,7 @@ public class Consumer { } public static FlinkKafkaConsumer getKafkaConsumer() { - FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC, + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(false); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java index e1a5b22..f9bee25 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -19,7 +19,7 @@ public class Producer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS); properties.put("acks", FlowWriteConfig.PRODUCER_ACK); properties.put("retries", FlowWriteConfig.RETRIES); properties.put("linger.ms", FlowWriteConfig.LINGER_MS); @@ -37,7 +37,7 @@ public class Producer { public static FlinkKafkaProducer getKafkaProducer() { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( - FlowWriteConfig.OUTPUT_KAFKA_TOPIC, + FlowWriteConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty());