From d7f3e40340299b7f8f528b0e93e43cb13144b021 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Fri, 11 Jun 2021 11:10:16 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=9A=E5=A2=9E=E5=8A=A0default=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E3=80=82=202=EF=BC=9A=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E8=A7=A3=E6=9E=90=E5=BC=82=E5=B8=B8=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=E6=80=A7bug=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 6 +++--- ...g.properties => default_config.properties} | 11 +++++++++- properties/service_flow_config.properties | 11 +++++----- .../java/com/zdjizhi/bolt/CompletionBolt.java | 6 ++---- .../zdjizhi/bolt/{kafka => }/LogSendBolt.java | 7 +++---- ...kaProConfig.java => DefaultProConfig.java} | 6 ++++-- .../com/zdjizhi/common/FlowWriteConfig.java | 1 + .../zdjizhi/spout/CustomizedKafkaSpout.java | 6 ++---- .../topology/LogFlowWriteTopology.java | 13 ++++++------ .../java/com/zdjizhi/utils/app/AppUtils.java | 8 +++++--- .../exception/StreamCompletionException.java | 18 ----------------- .../zdjizhi/utils/general/SnowflakeId.java | 3 +-- .../zdjizhi/utils/general/TransFormUtils.java | 20 +++++-------------- .../zdjizhi/utils/general/TransFunction.java | 7 ++++--- .../com/zdjizhi/utils/hbase/HBaseUtils.java | 8 +++++--- .../com/zdjizhi/utils/json/JsonParseUtil.java | 6 ++++++ .../com/zdjizhi/utils/kafka/KafkaLogSend.java | 14 ++++++------- .../utils/system/FlowWriteConfigurations.java | 5 ++--- .../utils/zookeeper/DistributedLock.java | 4 +--- 19 files changed, 74 insertions(+), 86 deletions(-) rename properties/{kafka_config.properties => default_config.properties} (63%) rename src/main/java/com/zdjizhi/bolt/{kafka => }/LogSendBolt.java (89%) rename src/main/java/com/zdjizhi/common/{KafkaProConfig.java => DefaultProConfig.java} (64%) delete mode 100644 src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java diff --git a/pom.xml b/pom.xml index dc8725c..2d2b363 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zdjizhi log-stream-completion-schema - v3.21.04.23-appId + v3.21.06.07-Array jar log-stream-completion-schema @@ -34,7 +34,7 @@ - package + install shade @@ -128,7 +128,7 @@ org.apache.storm storm-core ${storm.version} - + provided slf4j-log4j12 diff --git a/properties/kafka_config.properties b/properties/default_config.properties similarity index 63% rename from properties/kafka_config.properties rename to properties/default_config.properties index 9cfa949..a8c1ea0 100644 --- a/properties/kafka_config.properties +++ b/properties/default_config.properties @@ -14,4 +14,13 @@ batch.size=262144 buffer.memory=67108864 #这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 -max.request.size=5242880 \ No newline at end of file +max.request.size=5242880 + +#worker进程中向外发送消息的缓存大小 +transfer_buffer_size=32 + +#executor线程的接收队列大小;需要为2的倍数 +executor_receive_buffer_size=16384 + +#executor线程的发送队列大小;需要为2的倍数 +executor_send_buffer_size=16384 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index edcb30e..33c1667 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #ip.library=/home/bigdata/topology/dat/ #缃戝叧鐨剆chema浣嶇疆 -schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log #缃戝叧APP_ID 鑾峰彇鎺ュ彛 app.id.http=http://192.168.44.67:9999/open-api/appDicList @@ -26,10 +26,10 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList #--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# #kafka 鎺ユ敹鏁版嵁topic -kafka.topic=test +kafka.topic=PROXY-EVENT-LOG #琛ュ叏鏁版嵁 杈撳嚭 topic -results.output.topic=test-result +results.output.topic=PROXY-EVENT-COMPLETED-LOG #璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 group.id=connection-record-log-20200818-1-test @@ -66,10 +66,10 @@ kafka.bolt.parallelism=6 #鏁版嵁涓績锛圲ID锛 data.center.id.num=15 -#hbase 鏇存柊鏃堕棿 +#hbase 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 hbase.tick.tuple.freq.secs=60 -#app_id 鏇存柊鏃堕棿 +#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨 app.tick.tuple.freq.secs=60 #--------------------------------榛樿鍊奸厤缃------------------------------# @@ -101,3 +101,4 @@ mail.default.charset=UTF-8 #闇涓嶈琛ュ叏锛屼笉闇瑕佸垯鍘熸牱鏃ュ織杈撳嚭 log.need.complete=yes + diff --git a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java index b20858f..d1ca4fa 100644 --- a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java +++ b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java @@ -1,10 +1,8 @@ package com.zdjizhi.bolt; -import com.zdjizhi.common.FlowWriteConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -39,8 +37,8 @@ public class CompletionBolt extends BaseBasicBolt { if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(dealCommonMessage(message))); } - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "鎺ユ敹/瑙f瀽杩囩▼鍑虹幇寮傚父"); + } catch (RuntimeException e) { + logger.error("澶勭悊鍘熷鏃ュ織涓嬪彂杩囩▼寮傚父,寮傚父淇℃伅锛" + e); } } diff --git a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java similarity index 89% rename from src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java rename to src/main/java/com/zdjizhi/bolt/LogSendBolt.java index a61edcf..46c6353 100644 --- a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java +++ b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java @@ -1,7 +1,6 @@ -package com.zdjizhi.bolt.kafka; +package com.zdjizhi.bolt; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.kafka.KafkaLogSend; import com.zdjizhi.utils.system.TupleUtils; import cn.hutool.log.Log; @@ -53,8 +52,8 @@ public class LogSendBolt extends BaseBasicBolt { list.clear(); } } - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "鏃ュ織鍙戦並afka杩囩▼鍑虹幇寮傚父"); + } catch (RuntimeException e) { + logger.error("琛ュ叏鏃ュ織鍙戦並afka杩囩▼鍑虹幇寮傚父,寮傚父淇℃伅:" + e); } } diff --git a/src/main/java/com/zdjizhi/common/KafkaProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java similarity index 64% rename from src/main/java/com/zdjizhi/common/KafkaProConfig.java rename to src/main/java/com/zdjizhi/common/DefaultProConfig.java index 6e7d616..3fdb013 100644 --- a/src/main/java/com/zdjizhi/common/KafkaProConfig.java +++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java @@ -6,7 +6,7 @@ import com.zdjizhi.utils.system.FlowWriteConfigurations; /** * @author Administrator */ -public class KafkaProConfig { +public class DefaultProConfig { public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); @@ -15,6 +15,8 @@ public class KafkaProConfig { public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); - + public static final Integer TRANSFER_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "transfer_buffer_size"); + public static final Integer EXECUTOR_RECEIVE_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_receive_buffer_size"); + public static final Integer EXECUTOR_SEND_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_send_buffer_size"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 36f2ea4..fc2e116 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -14,6 +14,7 @@ public class FlowWriteConfig { public static final String IS_JSON_KEY_TAG = "$."; public static final String IF_CONDITION_SPLITTER = "="; public static final String MODEL = "remote"; + public static final String PROTOCOL_SPLITTER = "\\."; /** * System */ diff --git a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java index 150e02c..b2aad25 100644 --- a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java +++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java @@ -4,8 +4,6 @@ import cn.hutool.core.thread.ThreadUtil; import com.zdjizhi.common.FlowWriteConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.KafkaProConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -78,8 +76,8 @@ public class CustomizedKafkaSpout extends BaseRichSpout { for (ConsumerRecord record : records) { this.collector.emit(new Values(record.value())); } - } catch (StreamCompletionException e) { - logger.error("KafkaSpout鍙戦佹秷鎭嚭鐜板紓甯!", e); + } catch (RuntimeException e) { + logger.error("KafkaSpout鍙戦佹秷鎭嚭鐜板紓甯,寮傚父淇℃伅:", e); } } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index ccde5df..036f922 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -2,12 +2,12 @@ package com.zdjizhi.topology; import com.zdjizhi.bolt.CompletionBolt; -import com.zdjizhi.bolt.kafka.LogSendBolt; +import com.zdjizhi.bolt.LogSendBolt; +import com.zdjizhi.common.DefaultProConfig; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.spout.CustomizedKafkaSpout; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.exception.StreamCompletionException; import org.apache.storm.Config; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; @@ -51,8 +51,9 @@ public class LogFlowWriteTopology { private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - //璁剧疆杩囬珮浼氬鑷村緢澶氶棶棰橈紝濡傚績璺崇嚎绋嬮タ姝汇佸悶鍚愰噺澶у箙涓嬭穼 - topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE); StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); } @@ -63,7 +64,7 @@ public class LogFlowWriteTopology { if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) { builder.setBolt("LogCompletionBolt", new CompletionBolt(), FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("CompletionLogSendBolt", new LogSendBolt(), + builder.setBolt("LogSendBolt", new LogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt"); } else { builder.setBolt("LogSendBolt", new LogSendBolt(), @@ -92,7 +93,7 @@ public class LogFlowWriteTopology { logger.info("鎵ц杩滅▼閮ㄧ讲妯″紡..."); flowWriteTopology.runRemotely(); } - } catch (StreamCompletionException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) { + } catch (RuntimeException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) { logger.error("Topology Start ERROR! message is:" + e); } } diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java index 08890a6..1193b13 100644 --- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -44,7 +44,7 @@ public class AppUtils { */ private AppUtils() { //瀹氭椂鏇存柊 - updateHabaseCache(); + updateAppIdCache(); } /** @@ -92,13 +92,15 @@ public class AppUtils { /** * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie */ - private void updateHabaseCache() { + private void updateAppIdCache() { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { - change(); + if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } } catch (RuntimeException e) { logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); } diff --git a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java b/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java deleted file mode 100644 index 2a31b11..0000000 --- a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.zdjizhi.utils.exception; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.exception - * @Description: - * @date 2021/3/2510:14 - */ -public class StreamCompletionException extends RuntimeException { - - public StreamCompletionException(Exception e) { - super(e); - } - - public StreamCompletionException(String e) { - super(e); - } -} diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java index 9663dc6..f67a17b 100644 --- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.general; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.zookeeper.DistributedLock; import com.zdjizhi.utils.zookeeper.ZookeeperUtils; import cn.hutool.log.Log; @@ -127,7 +126,7 @@ public class SnowflakeId { } this.workerId = tmpWorkerId; this.dataCenterId = dataCenterIdNum; - } catch (StreamCompletionException e) { + } catch (RuntimeException e) { logger.error("This is not usual error!!!===>>>" + e + "<<<==="); }finally { lock.unlock(); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java index a5e26fb..66eadde 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java @@ -2,7 +2,6 @@ package com.zdjizhi.utils.general; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.exception.StreamCompletionException; import com.zdjizhi.utils.json.JsonParseUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -37,11 +36,6 @@ public class TransFormUtils { */ private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); - /** - * 琛ュ叏宸ュ叿绫 - */ -// private static FormatUtils build = new FormatUtils.Builder(false).build(); - /** * IP瀹氫綅搴撳伐鍏风被 */ @@ -61,10 +55,8 @@ public class TransFormUtils { * @return 琛ュ叏鍚庣殑鏃ュ織 */ public static String dealCommonMessage(String message) { - - Object object = JSONObject.parseObject(message, mapObject.getClass()); - try { + Object object = JSONObject.parseObject(message, mapObject.getClass()); for (String[] strings : jobList) { //鐢ㄥ埌鐨勫弬鏁扮殑鍊 Object name = JsonParseUtil.getValue(object, strings[0]); @@ -80,8 +72,8 @@ public class TransFormUtils { functionSet(function, object, appendToKeyName, appendTo, name, param); } return JSONObject.toJSONString(object); - } catch (StreamCompletionException e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "鏃ュ織棰勫鐞嗚繃绋嬪嚭鐜板紓甯"); + } catch (RuntimeException e) { + logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e); return ""; } } @@ -105,8 +97,6 @@ public class TransFormUtils { } break; case "snowflake_id": -// JsonParseUtil.setValue(object, appendToKeyName, -// build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM)); JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); break; case "geo_ip_detail": @@ -150,8 +140,8 @@ public class TransFormUtils { } break; case "app_match": - if ((int) name != 0 && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString()))); + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); } break; case "decode_of_base64": diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 9d8a355..bfb71a2 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -85,11 +85,12 @@ class TransFunction { /** * appId涓庣紦瀛樹腑瀵瑰簲鍏崇郴琛ュ叏appName * - * @param appId id + * @param appIds app id 鍒楄〃 * @return appName */ - static String appMatch(int appId) { - String appName = AppUtils.getAppName(appId); + static String appMatch(String appIds) { + String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + String appName = AppUtils.getAppName(Integer.parseInt(appId)); if (StringUtil.isBlank(appName)) { logger.warn("AppMap get appName is null, ID is :{}", appId); } diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 042a930..89814dc 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -51,7 +51,7 @@ public class HBaseUtils { //鎷夊彇鎵鏈 getAll(); //瀹氭椂鏇存柊 - updateHabaseCache(); + updateHBaseCache(); } private static void getHbaseConn() { @@ -164,7 +164,7 @@ public class HBaseUtils { /** * 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie */ - private void updateHabaseCache() { + private void updateHBaseCache() { // ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, // new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); @@ -172,7 +172,9 @@ public class HBaseUtils { @Override public void run() { try { - change(); + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } } catch (RuntimeException e) { logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 10e07d1..e4ee207 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -43,6 +43,9 @@ public class JsonParseUtil { case "long": clazz = long.class; break; + case "array": + clazz = JSONArray.class; + break; case "Integer": clazz = Integer.class; break; @@ -135,6 +138,9 @@ public class JsonParseUtil { if (checkKeepField(filedStr)) { String name = JsonPath.read(filedStr, "$.name").toString(); String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } //缁勫悎鐢ㄦ潵鐢熸垚瀹炰綋绫荤殑map map.put(name, getClassName(type)); } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java index 87509b4..d4c86fc 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java @@ -1,7 +1,7 @@ package com.zdjizhi.utils.kafka; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.KafkaProConfig; +import com.zdjizhi.common.DefaultProConfig; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.kafka.clients.producer.*; @@ -70,12 +70,12 @@ public class KafkaLogSend { properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", FlowWriteConfig.PRODUCER_ACK); - properties.put("retries", KafkaProConfig.RETRIES); - properties.put("linger.ms", KafkaProConfig.LINGER_MS); - properties.put("request.timeout.ms", KafkaProConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", KafkaProConfig.BATCH_SIZE); - properties.put("buffer.memory", KafkaProConfig.BUFFER_MEMORY); - properties.put("max.request.size", KafkaProConfig.MAX_REQUEST_SIZE); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); // properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE); /** diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java index 837d881..08fa29b 100644 --- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.system; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.StreamCompletionException; import java.io.IOException; import java.util.Locale; @@ -62,8 +61,8 @@ public final class FlowWriteConfigurations { static { try { propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("kafka_config.properties")); - } catch (IOException | StreamCompletionException e) { + propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { propKafka = null; propService = null; } diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java index a8a7312..2afab03 100644 --- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -2,8 +2,6 @@ package com.zdjizhi.utils.zookeeper; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.exception.StreamCompletionException; -import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -136,7 +134,7 @@ public class DistributedLock implements Lock, Watcher { return true; } return waitForLock(waitLock, timeout); - } catch (KeeperException | InterruptedException | StreamCompletionException e) { + } catch (KeeperException | InterruptedException | RuntimeException e) { logger.error("鍒ゆ柇鏄惁閿佸畾寮傚父" + e); } return false;