1:增加default配置文件。
2:修复解析异常程序终止性bug。
This commit is contained in:
6
pom.xml
6
pom.xml
@@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>log-stream-completion-schema</artifactId>
|
<artifactId>log-stream-completion-schema</artifactId>
|
||||||
<version>v3.21.04.23-appId</version>
|
<version>v3.21.06.07-Array</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>log-stream-completion-schema</name>
|
<name>log-stream-completion-schema</name>
|
||||||
@@ -34,7 +34,7 @@
|
|||||||
|
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
<phase>package</phase>
|
<phase>install</phase>
|
||||||
<goals>
|
<goals>
|
||||||
<goal>shade</goal>
|
<goal>shade</goal>
|
||||||
</goals>
|
</goals>
|
||||||
@@ -128,7 +128,7 @@
|
|||||||
<groupId>org.apache.storm</groupId>
|
<groupId>org.apache.storm</groupId>
|
||||||
<artifactId>storm-core</artifactId>
|
<artifactId>storm-core</artifactId>
|
||||||
<version>${storm.version}</version>
|
<version>${storm.version}</version>
|
||||||
<!--<scope>provided</scope>-->
|
<scope>provided</scope>
|
||||||
<exclusions>
|
<exclusions>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
|||||||
@@ -14,4 +14,13 @@ batch.size=262144
|
|||||||
buffer.memory=67108864
|
buffer.memory=67108864
|
||||||
|
|
||||||
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
|
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
|
||||||
max.request.size=5242880
|
max.request.size=5242880
|
||||||
|
|
||||||
|
#worker进程中向外发送消息的缓存大小
|
||||||
|
transfer_buffer_size=32
|
||||||
|
|
||||||
|
#executor线程的接收队列大小;需要为2的倍数
|
||||||
|
executor_receive_buffer_size=16384
|
||||||
|
|
||||||
|
#executor线程的发送队列大小;需要为2的倍数
|
||||||
|
executor_send_buffer_size=16384
|
||||||
@@ -18,7 +18,7 @@ ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\
|
|||||||
#ip.library=/home/bigdata/topology/dat/
|
#ip.library=/home/bigdata/topology/dat/
|
||||||
|
|
||||||
#网关的schema位置
|
#网关的schema位置
|
||||||
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/security_event_log
|
schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log
|
||||||
|
|
||||||
#网关APP_ID 获取接口
|
#网关APP_ID 获取接口
|
||||||
app.id.http=http://192.168.44.67:9999/open-api/appDicList
|
app.id.http=http://192.168.44.67:9999/open-api/appDicList
|
||||||
@@ -26,10 +26,10 @@ app.id.http=http://192.168.44.67:9999/open-api/appDicList
|
|||||||
#--------------------------------Kafka消费组信息------------------------------#
|
#--------------------------------Kafka消费组信息------------------------------#
|
||||||
|
|
||||||
#kafka 接收数据topic
|
#kafka 接收数据topic
|
||||||
kafka.topic=test
|
kafka.topic=PROXY-EVENT-LOG
|
||||||
|
|
||||||
#补全数据 输出 topic
|
#补全数据 输出 topic
|
||||||
results.output.topic=test-result
|
results.output.topic=PROXY-EVENT-COMPLETED-LOG
|
||||||
|
|
||||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||||
group.id=connection-record-log-20200818-1-test
|
group.id=connection-record-log-20200818-1-test
|
||||||
@@ -66,10 +66,10 @@ kafka.bolt.parallelism=6
|
|||||||
#数据中心(UID)
|
#数据中心(UID)
|
||||||
data.center.id.num=15
|
data.center.id.num=15
|
||||||
|
|
||||||
#hbase 更新时间
|
#hbase 更新时间,如填写0则不更新缓存
|
||||||
hbase.tick.tuple.freq.secs=60
|
hbase.tick.tuple.freq.secs=60
|
||||||
|
|
||||||
#app_id 更新时间
|
#app_id 更新时间,如填写0则不更新缓存
|
||||||
app.tick.tuple.freq.secs=60
|
app.tick.tuple.freq.secs=60
|
||||||
|
|
||||||
#--------------------------------默认值配置------------------------------#
|
#--------------------------------默认值配置------------------------------#
|
||||||
@@ -101,3 +101,4 @@ mail.default.charset=UTF-8
|
|||||||
#需不要补全,不需要则原样日志输出
|
#需不要补全,不需要则原样日志输出
|
||||||
log.need.complete=yes
|
log.need.complete=yes
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
package com.zdjizhi.bolt;
|
package com.zdjizhi.bolt;
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import org.apache.storm.task.TopologyContext;
|
import org.apache.storm.task.TopologyContext;
|
||||||
import org.apache.storm.topology.BasicOutputCollector;
|
import org.apache.storm.topology.BasicOutputCollector;
|
||||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||||
@@ -39,8 +37,8 @@ public class CompletionBolt extends BaseBasicBolt {
|
|||||||
if (StringUtil.isNotBlank(message)) {
|
if (StringUtil.isNotBlank(message)) {
|
||||||
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
|
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
|
||||||
}
|
}
|
||||||
} catch (StreamCompletionException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
|
logger.error("处理原始日志下发过程异常,异常信息:" + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.zdjizhi.bolt.kafka;
|
package com.zdjizhi.bolt;
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import com.zdjizhi.utils.kafka.KafkaLogSend;
|
import com.zdjizhi.utils.kafka.KafkaLogSend;
|
||||||
import com.zdjizhi.utils.system.TupleUtils;
|
import com.zdjizhi.utils.system.TupleUtils;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
@@ -53,8 +52,8 @@ public class LogSendBolt extends BaseBasicBolt {
|
|||||||
list.clear();
|
list.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (StreamCompletionException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
|
logger.error("补全日志发送Kafka过程出现异常,异常信息:" + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -6,7 +6,7 @@ import com.zdjizhi.utils.system.FlowWriteConfigurations;
|
|||||||
/**
|
/**
|
||||||
* @author Administrator
|
* @author Administrator
|
||||||
*/
|
*/
|
||||||
public class KafkaProConfig {
|
public class DefaultProConfig {
|
||||||
|
|
||||||
|
|
||||||
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
|
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
|
||||||
@@ -15,6 +15,8 @@ public class KafkaProConfig {
|
|||||||
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
|
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
|
||||||
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
|
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
|
||||||
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
|
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
|
||||||
|
public static final Integer TRANSFER_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "transfer_buffer_size");
|
||||||
|
public static final Integer EXECUTOR_RECEIVE_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_receive_buffer_size");
|
||||||
|
public static final Integer EXECUTOR_SEND_BUFFER_SIZE = FlowWriteConfigurations.getIntProperty(1, "executor_send_buffer_size");
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -14,6 +14,7 @@ public class FlowWriteConfig {
|
|||||||
public static final String IS_JSON_KEY_TAG = "$.";
|
public static final String IS_JSON_KEY_TAG = "$.";
|
||||||
public static final String IF_CONDITION_SPLITTER = "=";
|
public static final String IF_CONDITION_SPLITTER = "=";
|
||||||
public static final String MODEL = "remote";
|
public static final String MODEL = "remote";
|
||||||
|
public static final String PROTOCOL_SPLITTER = "\\.";
|
||||||
/**
|
/**
|
||||||
* System
|
* System
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ import cn.hutool.core.thread.ThreadUtil;
|
|||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.common.KafkaProConfig;
|
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
@@ -78,8 +76,8 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
|
|||||||
for (ConsumerRecord<String, String> record : records) {
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
this.collector.emit(new Values(record.value()));
|
this.collector.emit(new Values(record.value()));
|
||||||
}
|
}
|
||||||
} catch (StreamCompletionException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("KafkaSpout发送消息出现异常!", e);
|
logger.error("KafkaSpout发送消息出现异常,异常信息:", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ package com.zdjizhi.topology;
|
|||||||
|
|
||||||
|
|
||||||
import com.zdjizhi.bolt.CompletionBolt;
|
import com.zdjizhi.bolt.CompletionBolt;
|
||||||
import com.zdjizhi.bolt.kafka.LogSendBolt;
|
import com.zdjizhi.bolt.LogSendBolt;
|
||||||
|
import com.zdjizhi.common.DefaultProConfig;
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.spout.CustomizedKafkaSpout;
|
import com.zdjizhi.spout.CustomizedKafkaSpout;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import org.apache.storm.Config;
|
import org.apache.storm.Config;
|
||||||
import org.apache.storm.generated.AlreadyAliveException;
|
import org.apache.storm.generated.AlreadyAliveException;
|
||||||
import org.apache.storm.generated.AuthorizationException;
|
import org.apache.storm.generated.AuthorizationException;
|
||||||
@@ -51,8 +51,9 @@ public class LogFlowWriteTopology {
|
|||||||
|
|
||||||
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||||
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
|
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
|
||||||
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
|
topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE);
|
||||||
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
|
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE);
|
||||||
|
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE);
|
||||||
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
|
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +64,7 @@ public class LogFlowWriteTopology {
|
|||||||
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
|
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
|
||||||
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
|
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
|
||||||
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||||
builder.setBolt("CompletionLogSendBolt", new LogSendBolt(),
|
builder.setBolt("LogSendBolt", new LogSendBolt(),
|
||||||
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
|
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
|
||||||
} else {
|
} else {
|
||||||
builder.setBolt("LogSendBolt", new LogSendBolt(),
|
builder.setBolt("LogSendBolt", new LogSendBolt(),
|
||||||
@@ -92,7 +93,7 @@ public class LogFlowWriteTopology {
|
|||||||
logger.info("执行远程部署模式...");
|
logger.info("执行远程部署模式...");
|
||||||
flowWriteTopology.runRemotely();
|
flowWriteTopology.runRemotely();
|
||||||
}
|
}
|
||||||
} catch (StreamCompletionException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) {
|
} catch (RuntimeException | InterruptedException | InvalidTopologyException | AlreadyAliveException | AuthorizationException e) {
|
||||||
logger.error("Topology Start ERROR! message is:" + e);
|
logger.error("Topology Start ERROR! message is:" + e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ public class AppUtils {
|
|||||||
*/
|
*/
|
||||||
private AppUtils() {
|
private AppUtils() {
|
||||||
//定时更新
|
//定时更新
|
||||||
updateHabaseCache();
|
updateAppIdCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -92,13 +92,15 @@ public class AppUtils {
|
|||||||
/**
|
/**
|
||||||
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
||||||
*/
|
*/
|
||||||
private void updateHabaseCache() {
|
private void updateAppIdCache() {
|
||||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
||||||
executorService.scheduleAtFixedRate(new Runnable() {
|
executorService.scheduleAtFixedRate(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
change();
|
if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
|
||||||
|
change();
|
||||||
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
|
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,18 +0,0 @@
|
|||||||
package com.zdjizhi.utils.exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author qidaijie
|
|
||||||
* @Package com.zdjizhi.utils.exception
|
|
||||||
* @Description:
|
|
||||||
* @date 2021/3/2510:14
|
|
||||||
*/
|
|
||||||
public class StreamCompletionException extends RuntimeException {
|
|
||||||
|
|
||||||
public StreamCompletionException(Exception e) {
|
|
||||||
super(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
public StreamCompletionException(String e) {
|
|
||||||
super(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.zdjizhi.utils.general;
|
package com.zdjizhi.utils.general;
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import com.zdjizhi.utils.zookeeper.DistributedLock;
|
import com.zdjizhi.utils.zookeeper.DistributedLock;
|
||||||
import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
|
import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
@@ -127,7 +126,7 @@ public class SnowflakeId {
|
|||||||
}
|
}
|
||||||
this.workerId = tmpWorkerId;
|
this.workerId = tmpWorkerId;
|
||||||
this.dataCenterId = dataCenterIdNum;
|
this.dataCenterId = dataCenterIdNum;
|
||||||
} catch (StreamCompletionException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
|
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
|
||||||
}finally {
|
}finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.general;
|
|||||||
|
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
@@ -37,11 +36,6 @@ public class TransFormUtils {
|
|||||||
*/
|
*/
|
||||||
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
|
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
|
||||||
|
|
||||||
/**
|
|
||||||
* 补全工具类
|
|
||||||
*/
|
|
||||||
// private static FormatUtils build = new FormatUtils.Builder(false).build();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IP定位库工具类
|
* IP定位库工具类
|
||||||
*/
|
*/
|
||||||
@@ -61,10 +55,8 @@ public class TransFormUtils {
|
|||||||
* @return 补全后的日志
|
* @return 补全后的日志
|
||||||
*/
|
*/
|
||||||
public static String dealCommonMessage(String message) {
|
public static String dealCommonMessage(String message) {
|
||||||
|
|
||||||
Object object = JSONObject.parseObject(message, mapObject.getClass());
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Object object = JSONObject.parseObject(message, mapObject.getClass());
|
||||||
for (String[] strings : jobList) {
|
for (String[] strings : jobList) {
|
||||||
//用到的参数的值
|
//用到的参数的值
|
||||||
Object name = JsonParseUtil.getValue(object, strings[0]);
|
Object name = JsonParseUtil.getValue(object, strings[0]);
|
||||||
@@ -80,8 +72,8 @@ public class TransFormUtils {
|
|||||||
functionSet(function, object, appendToKeyName, appendTo, name, param);
|
functionSet(function, object, appendToKeyName, appendTo, name, param);
|
||||||
}
|
}
|
||||||
return JSONObject.toJSONString(object);
|
return JSONObject.toJSONString(object);
|
||||||
} catch (StreamCompletionException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志预处理过程出现异常");
|
logger.error("解析补全日志信息过程异常,异常信息:" + e);
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -105,8 +97,6 @@ public class TransFormUtils {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "snowflake_id":
|
case "snowflake_id":
|
||||||
// JsonParseUtil.setValue(object, appendToKeyName,
|
|
||||||
// build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM));
|
|
||||||
JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
|
JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
|
||||||
break;
|
break;
|
||||||
case "geo_ip_detail":
|
case "geo_ip_detail":
|
||||||
@@ -150,8 +140,8 @@ public class TransFormUtils {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "app_match":
|
case "app_match":
|
||||||
if ((int) name != 0 && appendTo == null) {
|
if (name != null && appendTo == null) {
|
||||||
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(Integer.parseInt(name.toString())));
|
JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "decode_of_base64":
|
case "decode_of_base64":
|
||||||
|
|||||||
@@ -85,11 +85,12 @@ class TransFunction {
|
|||||||
/**
|
/**
|
||||||
* appId与缓存中对应关系补全appName
|
* appId与缓存中对应关系补全appName
|
||||||
*
|
*
|
||||||
* @param appId id
|
* @param appIds app id 列表
|
||||||
* @return appName
|
* @return appName
|
||||||
*/
|
*/
|
||||||
static String appMatch(int appId) {
|
static String appMatch(String appIds) {
|
||||||
String appName = AppUtils.getAppName(appId);
|
String appId = appIds.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
|
||||||
|
String appName = AppUtils.getAppName(Integer.parseInt(appId));
|
||||||
if (StringUtil.isBlank(appName)) {
|
if (StringUtil.isBlank(appName)) {
|
||||||
logger.warn("AppMap get appName is null, ID is :{}", appId);
|
logger.warn("AppMap get appName is null, ID is :{}", appId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ public class HBaseUtils {
|
|||||||
//拉取所有
|
//拉取所有
|
||||||
getAll();
|
getAll();
|
||||||
//定时更新
|
//定时更新
|
||||||
updateHabaseCache();
|
updateHBaseCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getHbaseConn() {
|
private static void getHbaseConn() {
|
||||||
@@ -164,7 +164,7 @@ public class HBaseUtils {
|
|||||||
/**
|
/**
|
||||||
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
||||||
*/
|
*/
|
||||||
private void updateHabaseCache() {
|
private void updateHBaseCache() {
|
||||||
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
|
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
|
||||||
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
|
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
|
||||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
||||||
@@ -172,7 +172,9 @@ public class HBaseUtils {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
change();
|
if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
|
||||||
|
change();
|
||||||
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
|
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ public class JsonParseUtil {
|
|||||||
case "long":
|
case "long":
|
||||||
clazz = long.class;
|
clazz = long.class;
|
||||||
break;
|
break;
|
||||||
|
case "array":
|
||||||
|
clazz = JSONArray.class;
|
||||||
|
break;
|
||||||
case "Integer":
|
case "Integer":
|
||||||
clazz = Integer.class;
|
clazz = Integer.class;
|
||||||
break;
|
break;
|
||||||
@@ -135,6 +138,9 @@ public class JsonParseUtil {
|
|||||||
if (checkKeepField(filedStr)) {
|
if (checkKeepField(filedStr)) {
|
||||||
String name = JsonPath.read(filedStr, "$.name").toString();
|
String name = JsonPath.read(filedStr, "$.name").toString();
|
||||||
String type = JsonPath.read(filedStr, "$.type").toString();
|
String type = JsonPath.read(filedStr, "$.type").toString();
|
||||||
|
if (type.contains("{")) {
|
||||||
|
type = JsonPath.read(filedStr, "$.type.type").toString();
|
||||||
|
}
|
||||||
//组合用来生成实体类的map
|
//组合用来生成实体类的map
|
||||||
map.put(name, getClassName(type));
|
map.put(name, getClassName(type));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package com.zdjizhi.utils.kafka;
|
package com.zdjizhi.utils.kafka;
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.common.KafkaProConfig;
|
import com.zdjizhi.common.DefaultProConfig;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import org.apache.kafka.clients.producer.*;
|
import org.apache.kafka.clients.producer.*;
|
||||||
@@ -70,12 +70,12 @@ public class KafkaLogSend {
|
|||||||
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
|
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
|
||||||
properties.put("retries", KafkaProConfig.RETRIES);
|
properties.put("retries", DefaultProConfig.RETRIES);
|
||||||
properties.put("linger.ms", KafkaProConfig.LINGER_MS);
|
properties.put("linger.ms", DefaultProConfig.LINGER_MS);
|
||||||
properties.put("request.timeout.ms", KafkaProConfig.REQUEST_TIMEOUT_MS);
|
properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
|
||||||
properties.put("batch.size", KafkaProConfig.BATCH_SIZE);
|
properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
|
||||||
properties.put("buffer.memory", KafkaProConfig.BUFFER_MEMORY);
|
properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
|
||||||
properties.put("max.request.size", KafkaProConfig.MAX_REQUEST_SIZE);
|
properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
|
||||||
// properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
|
// properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.zdjizhi.utils.system;
|
package com.zdjizhi.utils.system;
|
||||||
|
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
@@ -62,8 +61,8 @@ public final class FlowWriteConfigurations {
|
|||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
|
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
|
||||||
propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("kafka_config.properties"));
|
propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
|
||||||
} catch (IOException | StreamCompletionException e) {
|
} catch (IOException | RuntimeException e) {
|
||||||
propKafka = null;
|
propKafka = null;
|
||||||
propService = null;
|
propService = null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,8 +2,6 @@ package com.zdjizhi.utils.zookeeper;
|
|||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.utils.exception.StreamCompletionException;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.zookeeper.*;
|
import org.apache.zookeeper.*;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
|
||||||
@@ -136,7 +134,7 @@ public class DistributedLock implements Lock, Watcher {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return waitForLock(waitLock, timeout);
|
return waitForLock(waitLock, timeout);
|
||||||
} catch (KeeperException | InterruptedException | StreamCompletionException e) {
|
} catch (KeeperException | InterruptedException | RuntimeException e) {
|
||||||
logger.error("判断是否锁定异常" + e);
|
logger.error("判断是否锁定异常" + e);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
Reference in New Issue
Block a user