1:增加default配置文件。
2:修复解析异常程序终止性bug。
This commit is contained in:
@@ -97,7 +97,7 @@ public class SnowflakeId {
|
||||
/**
|
||||
* 设置允许时间回拨的最大限制10s
|
||||
*/
|
||||
private static final long rollBackTime = 10000L;
|
||||
private static final long ROLL_BACK_TIME = 10000L;
|
||||
|
||||
|
||||
private static SnowflakeId idWorker;
|
||||
@@ -143,7 +143,7 @@ public class SnowflakeId {
|
||||
private synchronized long nextId() {
|
||||
long timestamp = timeGen();
|
||||
//设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
|
||||
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
|
||||
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < ROLL_BACK_TIME) {
|
||||
timestamp = tilNextMillis(lastTimestamp);
|
||||
}
|
||||
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|
||||
|
||||
@@ -35,7 +35,7 @@ public class HBaseUtils {
|
||||
|
||||
private static HBaseUtils hBaseUtils;
|
||||
|
||||
private static void getHBaseInstance() {
|
||||
private static void getInstance() {
|
||||
hBaseUtils = new HBaseUtils();
|
||||
}
|
||||
|
||||
@@ -47,14 +47,14 @@ public class HBaseUtils {
|
||||
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
|
||||
hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
|
||||
//获取连接
|
||||
getHbaseConn();
|
||||
getConnection();
|
||||
//拉取所有
|
||||
getAll();
|
||||
//定时更新
|
||||
updateHBaseCache();
|
||||
updateCache();
|
||||
}
|
||||
|
||||
private static void getHbaseConn() {
|
||||
private static void getConnection() {
|
||||
try {
|
||||
// 管理Hbase的配置信息
|
||||
Configuration configuration = HBaseConfiguration.create();
|
||||
@@ -78,7 +78,7 @@ public class HBaseUtils {
|
||||
*/
|
||||
private static void change() {
|
||||
if (hBaseUtils == null) {
|
||||
getHBaseInstance();
|
||||
getInstance();
|
||||
}
|
||||
long nowTime = System.currentTimeMillis();
|
||||
timestampsFilter(time - 1000, nowTime + 500);
|
||||
@@ -164,7 +164,7 @@ public class HBaseUtils {
|
||||
/**
|
||||
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
||||
*/
|
||||
private void updateHBaseCache() {
|
||||
private void updateCache() {
|
||||
// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
|
||||
// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
|
||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
||||
@@ -192,7 +192,7 @@ public class HBaseUtils {
|
||||
public static String getAccount(String clientIp) {
|
||||
|
||||
if (hBaseUtils == null) {
|
||||
getHBaseInstance();
|
||||
getInstance();
|
||||
}
|
||||
return subIdMap.get(clientIp);
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import java.io.InputStreamReader;
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class HttpClientUtil {
|
||||
// private static final int MAX_STR_LEN = 512000;
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
/**
|
||||
|
||||
@@ -124,7 +124,7 @@ public class JsonParseUtil {
|
||||
* @return 用于反射生成schema类型的对象的一个map集合
|
||||
*/
|
||||
public static HashMap<String, Class> getMapFromHttp(String http) {
|
||||
HashMap<String, Class> map = new HashMap<>();
|
||||
HashMap<String, Class> map = new HashMap<>(16);
|
||||
|
||||
String schema = HttpClientUtil.requestByGetMethod(http);
|
||||
Object data = JSON.parseObject(schema).get("data");
|
||||
|
||||
@@ -76,9 +76,8 @@ public class KafkaLogSend {
|
||||
properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
|
||||
properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
|
||||
properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
|
||||
// properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
|
||||
|
||||
/**
|
||||
/*
|
||||
* kafka限流配置-20201117
|
||||
*/
|
||||
properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
|
||||
|
||||
@@ -67,7 +67,9 @@ public class DistributedLock implements Lock, Watcher {
|
||||
}
|
||||
}
|
||||
|
||||
// 节点监视器
|
||||
/**
|
||||
* 节点监视器
|
||||
*/
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
if (this.countDownLatch != null) {
|
||||
@@ -140,7 +142,15 @@ public class DistributedLock implements Lock, Watcher {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 等待锁
|
||||
/**
|
||||
* 等待锁
|
||||
*
|
||||
* @param prev 锁名称
|
||||
* @param waitTime 等待时间
|
||||
* @return
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
|
||||
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user