diff --git a/pom.xml b/pom.xml
index dc8725c..2d2b363 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.zdjizhi
log-stream-completion-schema
- v3.21.04.23-appId
+ v3.21.06.07-Array
jar
log-stream-completion-schema
@@ -34,7 +34,7 @@
- package
+ install
shade
@@ -128,7 +128,7 @@
org.apache.storm
storm-core
${storm.version}
-
+ provided
slf4j-log4j12
diff --git a/properties/kafka_config.properties b/properties/default_config.properties
similarity index 63%
rename from properties/kafka_config.properties
rename to properties/default_config.properties
index 9cfa949..a8c1ea0 100644
--- a/properties/kafka_config.properties
+++ b/properties/default_config.properties
@@ -14,4 +14,13 @@ batch.size=262144
buffer.memory=67108864
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
-max.request.size=5242880
\ No newline at end of file
+max.request.size=5242880
+
+#worker进程中向外发送消息的缓存大小
+transfer_buffer_size=32
+
+#executor线程的接收队列大小;需要为2的倍数
+executor_receive_buffer_size=16384
+
+#executor线程的发送队列大小;需要为2的倍数
+executor_send_buffer_size=16384
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index edcb30e..33c1667 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
#ip.library=/home/bigdata/topology/dat/
#缃戝叧鐨剆chema浣嶇疆
-schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log
#缃戝叧APP_ID 鑾峰彇鎺ュ彛
app.id.http=http://192.168.44.67:9999/open-api/appDicList
@@ -26,10 +26,10 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList
#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------#
#kafka 鎺ユ敹鏁版嵁topic
-kafka.topic=test
+kafka.topic=PROXY-EVENT-LOG
#琛ュ叏鏁版嵁 杈撳嚭 topic
-results.output.topic=test-result
+results.output.topic=PROXY-EVENT-COMPLETED-LOG
#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛
group.id=connection-record-log-20200818-1-test
@@ -66,10 +66,10 @@ kafka.bolt.parallelism=6
#鏁版嵁涓績锛圲ID锛
data.center.id.num=15
-#hbase 鏇存柊鏃堕棿
+#hbase 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
hbase.tick.tuple.freq.secs=60
-#app_id 鏇存柊鏃堕棿
+#app_id 鏇存柊鏃堕棿锛屽濉啓0鍒欎笉鏇存柊缂撳瓨
app.tick.tuple.freq.secs=60
#--------------------------------榛樿鍊奸厤缃------------------------------#
@@ -101,3 +101,4 @@ mail.default.charset=UTF-8
#闇涓嶈琛ュ叏锛屼笉闇瑕佸垯鍘熸牱鏃ュ織杈撳嚭
log.need.complete=yes
+
diff --git a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java
index b20858f..d1ca4fa 100644
--- a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java
@@ -1,10 +1,8 @@
package com.zdjizhi.bolt;
-import com.zdjizhi.common.FlowWriteConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -39,8 +37,8 @@ public class CompletionBolt extends BaseBasicBolt {
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
- } catch (StreamCompletionException e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "鎺ユ敹/瑙f瀽杩囩▼鍑虹幇寮傚父");
+ } catch (RuntimeException e) {
+ logger.error("澶勭悊鍘熷鏃ュ織涓嬪彂杩囩▼寮傚父,寮傚父淇℃伅锛" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java
similarity index 89%
rename from src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java
rename to src/main/java/com/zdjizhi/bolt/LogSendBolt.java
index a61edcf..46c6353 100644
--- a/src/main/java/com/zdjizhi/bolt/kafka/LogSendBolt.java
+++ b/src/main/java/com/zdjizhi/bolt/LogSendBolt.java
@@ -1,7 +1,6 @@
-package com.zdjizhi.bolt.kafka;
+package com.zdjizhi.bolt;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import com.zdjizhi.utils.kafka.KafkaLogSend;
import com.zdjizhi.utils.system.TupleUtils;
import cn.hutool.log.Log;
@@ -53,8 +52,8 @@ public class LogSendBolt extends BaseBasicBolt {
list.clear();
}
}
- } catch (StreamCompletionException e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "鏃ュ織鍙戦並afka杩囩▼鍑虹幇寮傚父");
+ } catch (RuntimeException e) {
+ logger.error("琛ュ叏鏃ュ織鍙戦並afka杩囩▼鍑虹幇寮傚父,寮傚父淇℃伅:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/common/KafkaProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
similarity index 64%
rename from src/main/java/com/zdjizhi/common/KafkaProConfig.java
rename to src/main/java/com/zdjizhi/common/DefaultProConfig.java
index 6e7d616..3fdb013 100644
--- a/src/main/java/com/zdjizhi/common/KafkaProConfig.java
+++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
@@ -6,7 +6,7 @@ import com.zdjizhi.utils.system.FlowWriteConfigurations;
/**
* @author Administrator
*/
-public class KafkaProConfig {
+public class DefaultProConfig {
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
@@ -15,6 +15,8 @@ public class KafkaProConfig {
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
-
+ public static final Integer TRANSFER_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "transfer_buffer_size");
+ public static final Integer EXECUTOR_RECEIVE_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_receive_buffer_size");
+ public static final Integer EXECUTOR_SEND_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_send_buffer_size");
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 36f2ea4..fc2e116 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -14,6 +14,7 @@ public class FlowWriteConfig {
public static final String IS_JSON_KEY_TAG = "$.";
public static final String IF_CONDITION_SPLITTER = "=";
public static final String MODEL = "remote";
+ public static final String PROTOCOL_SPLITTER = "\\.";
/**
* System
*/
diff --git a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
index 150e02c..b2aad25 100644
--- a/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/com/zdjizhi/spout/CustomizedKafkaSpout.java
@@ -4,8 +4,6 @@ import cn.hutool.core.thread.ThreadUtil;
import com.zdjizhi.common.FlowWriteConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.KafkaProConfig;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -78,8 +76,8 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
for (ConsumerRecord record : records) {
this.collector.emit(new Values(record.value()));
}
- } catch (StreamCompletionException e) {
- logger.error("KafkaSpout鍙戦佹秷鎭嚭鐜板紓甯!", e);
+ } catch (RuntimeException e) {
+ logger.error("KafkaSpout鍙戦佹秷鎭嚭鐜板紓甯,寮傚父淇℃伅:", e);
}
}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index ccde5df..036f922 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -2,12 +2,12 @@ package com.zdjizhi.topology;
import com.zdjizhi.bolt.CompletionBolt;
-import com.zdjizhi.bolt.kafka.LogSendBolt;
+import com.zdjizhi.bolt.LogSendBolt;
+import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.spout.CustomizedKafkaSpout;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
@@ -51,8 +51,9 @@ public class LogFlowWriteTopology {
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
- //璁剧疆杩囬珮浼氬鑷村緢澶氶棶棰橈紝濡傚績璺崇嚎绋嬮タ姝汇佸悶鍚愰噺澶у箙涓嬭穼
- topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE);
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE);
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
@@ -63,7 +64,7 @@ public class LogFlowWriteTopology {
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
- builder.setBolt("CompletionLogSendBolt", new LogSendBolt(),
+ builder.setBolt("LogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
} else {
builder.setBolt("LogSendBolt", new LogSendBolt(),
@@ -92,7 +93,7 @@ public class LogFlowWriteTopology {
logger.info("鎵ц杩滅▼閮ㄧ讲妯″紡...");
flowWriteTopology.runRemotely();
}
- } catch (StreamCompletionException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) {
+ } catch (RuntimeException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) {
logger.error("Topology Start ERROR! message is:" + e);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
index 08890a6..1193b13 100644
--- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -44,7 +44,7 @@ public class AppUtils {
*/
private AppUtils() {
//瀹氭椂鏇存柊
- updateHabaseCache();
+ updateAppIdCache();
}
/**
@@ -92,13 +92,15 @@ public class AppUtils {
/**
* 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie
*/
- private void updateHabaseCache() {
+ private void updateAppIdCache() {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
- change();
+ if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
} catch (RuntimeException e) {
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
}
diff --git a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java b/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java
deleted file mode 100644
index 2a31b11..0000000
--- a/src/main/java/com/zdjizhi/utils/exception/StreamCompletionException.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.zdjizhi.utils.exception;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.exception
- * @Description:
- * @date 2021/3/2510:14
- */
-public class StreamCompletionException extends RuntimeException {
-
- public StreamCompletionException(Exception e) {
- super(e);
- }
-
- public StreamCompletionException(String e) {
- super(e);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
index 9663dc6..f67a17b 100644
--- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -1,7 +1,6 @@
package com.zdjizhi.utils.general;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import com.zdjizhi.utils.zookeeper.DistributedLock;
import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
import cn.hutool.log.Log;
@@ -127,7 +126,7 @@ public class SnowflakeId {
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterIdNum;
- } catch (StreamCompletionException e) {
+ } catch (RuntimeException e) {
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}finally {
lock.unlock();
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
index a5e26fb..66eadde 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.general;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import com.zdjizhi.utils.json.JsonParseUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -37,11 +36,6 @@ public class TransFormUtils {
*/
private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
- /**
- * 琛ュ叏宸ュ叿绫
- */
-// private static FormatUtils build = new FormatUtils.Builder(false).build();
-
/**
* IP瀹氫綅搴撳伐鍏风被
*/
@@ -61,10 +55,8 @@ public class TransFormUtils {
* @return 琛ュ叏鍚庣殑鏃ュ織
*/
public static String dealCommonMessage(String message) {
-
- Object object = JSONObject.parseObject(message, mapObject.getClass());
-
try {
+ Object object = JSONObject.parseObject(message, mapObject.getClass());
for (String[] strings : jobList) {
//鐢ㄥ埌鐨勫弬鏁扮殑鍊
Object name = JsonParseUtil.getValue(object, strings[0]);
@@ -80,8 +72,8 @@ public class TransFormUtils {
functionSet(function, object, appendToKeyName, appendTo, name, param);
}
return JSONObject.toJSONString(object);
- } catch (StreamCompletionException e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "鏃ュ織棰勫鐞嗚繃绋嬪嚭鐜板紓甯");
+ } catch (RuntimeException e) {
+ logger.error("瑙f瀽琛ュ叏鏃ュ織淇℃伅杩囩▼寮傚父,寮傚父淇℃伅:" + e);
return "";
}
}
@@ -105,8 +97,6 @@ public class TransFormUtils {
}
break;
case "snowflake_id":
-// JsonParseUtil.setValue(object, appendToKeyName,
-// build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM));
JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
@@ -150,8 +140,8 @@ public class TransFormUtils {
}
break;
case "app_match":
- if ((int) name != 0 && appendTo == null) {
- JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString())));
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
}
break;
case "decode_of_base64":
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 9d8a355..bfb71a2 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -85,11 +85,12 @@ class TransFunction {
/**
* appId涓庣紦瀛樹腑瀵瑰簲鍏崇郴琛ュ叏appName
*
- * @param appId id
+ * @param appIds app id 鍒楄〃
* @return appName
*/
- static String appMatch(int appId) {
- String appName = AppUtils.getAppName(appId);
+ static String appMatch(String appIds) {
+ String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ String appName = AppUtils.getAppName(Integer.parseInt(appId));
if (StringUtil.isBlank(appName)) {
logger.warn("AppMap get appName is null, ID is :{}", appId);
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index 042a930..89814dc 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -51,7 +51,7 @@ public class HBaseUtils {
//鎷夊彇鎵鏈
getAll();
//瀹氭椂鏇存柊
- updateHabaseCache();
+ updateHBaseCache();
}
private static void getHbaseConn() {
@@ -164,7 +164,7 @@ public class HBaseUtils {
/**
* 楠岃瘉瀹氭椂鍣,姣忛殧涓娈垫椂闂撮獙璇佷竴娆-楠岃瘉鑾峰彇鏂扮殑Cookie
*/
- private void updateHabaseCache() {
+ private void updateHBaseCache() {
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
@@ -172,7 +172,9 @@ public class HBaseUtils {
@Override
public void run() {
try {
- change();
+ if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
} catch (RuntimeException e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
index 10e07d1..e4ee207 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -43,6 +43,9 @@ public class JsonParseUtil {
case "long":
clazz = long.class;
break;
+ case "array":
+ clazz = JSONArray.class;
+ break;
case "Integer":
clazz = Integer.class;
break;
@@ -135,6 +138,9 @@ public class JsonParseUtil {
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
+ if (type.contains("{")) {
+ type = JsonPath.read(filedStr, "$.type.type").toString();
+ }
//缁勫悎鐢ㄦ潵鐢熸垚瀹炰綋绫荤殑map
map.put(name, getClassName(type));
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
index 87509b4..d4c86fc 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaLogSend.java
@@ -1,7 +1,7 @@
package com.zdjizhi.utils.kafka;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.common.KafkaProConfig;
+import com.zdjizhi.common.DefaultProConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.producer.*;
@@ -70,12 +70,12 @@ public class KafkaLogSend {
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
- properties.put("retries", KafkaProConfig.RETRIES);
- properties.put("linger.ms", KafkaProConfig.LINGER_MS);
- properties.put("request.timeout.ms", KafkaProConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", KafkaProConfig.BATCH_SIZE);
- properties.put("buffer.memory", KafkaProConfig.BUFFER_MEMORY);
- properties.put("max.request.size", KafkaProConfig.MAX_REQUEST_SIZE);
+ properties.put("retries", DefaultProConfig.RETRIES);
+ properties.put("linger.ms", DefaultProConfig.LINGER_MS);
+ properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
+ properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
// properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
/**
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
index 837d881..08fa29b 100644
--- a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -1,7 +1,6 @@
package com.zdjizhi.utils.system;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.StreamCompletionException;
import java.io.IOException;
import java.util.Locale;
@@ -62,8 +61,8 @@ public final class FlowWriteConfigurations {
static {
try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("kafka_config.properties"));
- } catch (IOException | StreamCompletionException e) {
+ propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
propKafka = null;
propService = null;
}
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
index a8a7312..2afab03 100644
--- a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
@@ -2,8 +2,6 @@ package com.zdjizhi.utils.zookeeper;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.exception.StreamCompletionException;
-import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -136,7 +134,7 @@ public class DistributedLock implements Lock, Watcher {
return true;
}
return waitForLock(waitLock, timeout);
- } catch (KeeperException | InterruptedException | StreamCompletionException e) {
+ } catch (KeeperException | InterruptedException | RuntimeException e) {
logger.error("鍒ゆ柇鏄惁閿佸畾寮傚父" + e);
}
return false;