修改workerID的获取方式为zookeeper
增加zookeeper分布式锁
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
|
||||
import cn.ac.iie.utils.system.TupleUtils;
|
||||
import cn.ac.iie.utils.kafka.KafkaLogNtc;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
@@ -9,7 +10,9 @@ import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
@@ -25,6 +28,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
|
||||
private List<String> list;
|
||||
private KafkaLogNtc kafkaLogNtc;
|
||||
private static long successfulSum = 0;
|
||||
|
||||
|
||||
@Override
|
||||
@@ -39,8 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
if (TupleUtils.isTick(tuple)) {
|
||||
if (list.size() != 0) {
|
||||
kafkaLogNtc.sendMessage(list);
|
||||
successfulSum += list.size();
|
||||
list.clear();
|
||||
}
|
||||
basicOutputCollector.emit(new Values(successfulSum));
|
||||
successfulSum = 0L;
|
||||
} else {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
@@ -48,6 +55,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
}
|
||||
if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
|
||||
kafkaLogNtc.sendMessage(list);
|
||||
successfulSum += list.size();
|
||||
list.clear();
|
||||
}
|
||||
}
|
||||
@@ -66,6 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("suc"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
65
src/main/java/cn/ac/iie/bolt/SummaryBolt.java
Normal file
65
src/main/java/cn/ac/iie/bolt/SummaryBolt.java
Normal file
@@ -0,0 +1,65 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
|
||||
import cn.ac.iie.utils.system.TupleUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.InfluxDBFactory;
|
||||
import org.influxdb.dto.Point;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 统计总数bolt,用于将统计后的数入influxDB
|
||||
*
|
||||
* @author antlee
|
||||
* @date 2018/8/14
|
||||
*/
|
||||
public class SummaryBolt extends BaseBasicBolt {
|
||||
private static final long serialVersionUID = 4614020687381536301L;
|
||||
private static Logger logger = Logger.getLogger(SummaryBolt.class);
|
||||
private static long sum = 0L;
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
if (TupleUtils.isTick(tuple)) {
|
||||
InfluxDbUtils.sendKafkaSuccess(sum);
|
||||
sum = 0L;
|
||||
} else {
|
||||
long successfulSum = tuple.getLong(0);
|
||||
sum += successfulSum;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("计数写入influxDB出现异常 ", e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
Map<String, Object> conf = new HashMap<String, Object>(16);
|
||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package cn.ac.iie.topology;
|
||||
|
||||
import cn.ac.iie.bolt.ConnCompletionBolt;
|
||||
import cn.ac.iie.bolt.NtcLogSendBolt;
|
||||
import cn.ac.iie.bolt.SummaryBolt;
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -59,6 +60,7 @@ public class LogFlowWriteTopology {
|
||||
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
|
||||
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
|
||||
builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
@@ -4,6 +4,8 @@ import cn.ac.iie.bean.SessionRecordLog;
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.redis.RedisPollUtils;
|
||||
import cn.ac.iie.utils.system.SnowflakeId;
|
||||
import cn.ac.iie.utils.zookeeper.DistributedLock;
|
||||
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.utils.IpLookup;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
@@ -29,6 +31,8 @@ public class TransFormUtils {
|
||||
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
|
||||
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
|
||||
.build();
|
||||
// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
// private static SnowflakeId snowflakeId = new SnowflakeId();
|
||||
|
||||
/**
|
||||
* 解析日志,并补全
|
||||
@@ -54,7 +58,6 @@ public class TransFormUtils {
|
||||
logger.error("日志解析过程出现异常", e);
|
||||
return "";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.influxdb.dto.Point;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
/**
|
||||
* 写入influxDB工具类
|
||||
@@ -19,14 +20,29 @@ public class InfluxDbUtils {
|
||||
/**
|
||||
* 原始日志写入数据中心kafka失败标识
|
||||
*/
|
||||
public static void sendKafkaFail() {
|
||||
public static void sendKafkaFail(int discarded) {
|
||||
InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
|
||||
Point point1 = Point.measurement("sendkafkafail")
|
||||
Point point1 = Point.measurement("SendKafkaFail")
|
||||
.tag("topology", FlowWriteConfig.KAFKA_TOPIC)
|
||||
.tag("hostname", getIp())
|
||||
.field("state", 1)
|
||||
.field("discarded", discarded)
|
||||
.build();
|
||||
client.write("kafka", "", point1);
|
||||
client.write("BusinessMonitor", "", point1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 原始日志写入数据中心kafka失败标识
|
||||
*/
|
||||
public static void sendKafkaSuccess(Long complete) {
|
||||
if (complete != 0) {
|
||||
InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
|
||||
Point point1 = Point.measurement("SendKafkaSuccess")
|
||||
.tag("topology", FlowWriteConfig.KAFKA_TOPIC)
|
||||
.tag("hostname", getIp())
|
||||
.field("complete", complete)
|
||||
.build();
|
||||
client.write("BusinessMonitor", "", point1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -45,4 +61,9 @@ public class InfluxDbUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
sendKafkaFail(100);
|
||||
sendKafkaSuccess(100L);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ public class KafkaLogNtc {
|
||||
}
|
||||
});
|
||||
if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) {
|
||||
logger.error("超出最大容忍错误数抛弃数据条数:" + list.size());
|
||||
InfluxDbUtils.sendKafkaFail(list.size());
|
||||
list.clear();
|
||||
}
|
||||
}
|
||||
@@ -75,7 +75,7 @@ public class KafkaLogNtc {
|
||||
properties.put("request.timeout.ms", 60000);
|
||||
properties.put("batch.size", 262144);
|
||||
properties.put("buffer.memory", 33554432);
|
||||
// properties.put("compression.type", "snappy");
|
||||
properties.put("compression.type", "snappy");
|
||||
kafkaProducer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package cn.ac.iie.utils.system;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.kafka.KafkaLogNtc;
|
||||
import cn.ac.iie.utils.redis.RedisPollUtils;
|
||||
import cn.ac.iie.utils.zookeeper.DistributedLock;
|
||||
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
|
||||
/**
|
||||
* 雪花算法
|
||||
@@ -86,33 +84,32 @@ public class SnowflakeId {
|
||||
*/
|
||||
private long lastTimestamp = -1L;
|
||||
|
||||
|
||||
private static SnowflakeId idWorker;
|
||||
|
||||
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
|
||||
static {
|
||||
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
zookeeperUtils.createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
|
||||
// idWorker = new SnowflakeId(RedisPollUtils.getWorkerId(FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM);
|
||||
idWorker = new SnowflakeId(zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM);
|
||||
// idWorker = new SnowflakeId(12, FlowWriteConfig.DATA_CENTER_ID_NUM);
|
||||
idWorker = new SnowflakeId();
|
||||
}
|
||||
|
||||
//==============================Constructors=====================================
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*
|
||||
* @param workerId 工作ID (0~63)
|
||||
* @param dataCenterId 数据中心ID (0~15)
|
||||
*/
|
||||
private SnowflakeId(int workerId, int dataCenterId) {
|
||||
if (workerId > maxWorkerId || workerId < 0) {
|
||||
throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));
|
||||
private SnowflakeId() {
|
||||
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
lock.lock();
|
||||
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
|
||||
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
||||
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
||||
}
|
||||
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
|
||||
throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));
|
||||
if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) {
|
||||
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
|
||||
}
|
||||
this.workerId = workerId;
|
||||
this.dataCenterId = dataCenterId;
|
||||
this.workerId = tmpWorkerId;
|
||||
this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
|
||||
}
|
||||
|
||||
// ==============================Methods==========================================
|
||||
@@ -188,15 +185,5 @@ public class SnowflakeId {
|
||||
return idWorker.nextId();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
while (true) {
|
||||
try {
|
||||
logger.info(SnowflakeId.generateId());
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
215
src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java
Normal file
215
src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java
Normal file
@@ -0,0 +1,215 @@
|
||||
package cn.ac.iie.utils.zookeeper;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.system.SnowflakeId;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zookeeper.*;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class DistributedLock implements Lock, Watcher {
|
||||
private static Logger logger = Logger.getLogger(DistributedLock.class);
|
||||
|
||||
private ZooKeeper zk = null;
|
||||
/**
|
||||
* 根节点
|
||||
*/
|
||||
private final String ROOT_LOCK = "/locks";
|
||||
/**
|
||||
* 竞争的资源
|
||||
*/
|
||||
private String lockName;
|
||||
/**
|
||||
* 等待的前一个锁
|
||||
*/
|
||||
private String waitLock;
|
||||
/**
|
||||
* 当前锁
|
||||
*/
|
||||
private String currentLock;
|
||||
/**
|
||||
* 计数器
|
||||
*/
|
||||
private CountDownLatch countDownLatch;
|
||||
|
||||
private int sessionTimeout = 2000;
|
||||
|
||||
private List<Exception> exceptionList = new ArrayList<Exception>();
|
||||
|
||||
/**
|
||||
* 配置分布式锁
|
||||
*
|
||||
* @param config 连接的url
|
||||
* @param lockName 竞争资源
|
||||
*/
|
||||
public DistributedLock(String config, String lockName) {
|
||||
this.lockName = lockName;
|
||||
try {
|
||||
// 连接zookeeper
|
||||
zk = new ZooKeeper(config, sessionTimeout, this);
|
||||
Stat stat = zk.exists(ROOT_LOCK, false);
|
||||
if (stat == null) {
|
||||
// 如果根节点不存在,则创建根节点
|
||||
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
}
|
||||
} catch (IOException | InterruptedException | KeeperException e) {
|
||||
logger.error("Node already exists!");
|
||||
}
|
||||
}
|
||||
|
||||
// 节点监视器
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
if (this.countDownLatch != null) {
|
||||
this.countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
if (exceptionList.size() > 0) {
|
||||
throw new LockException(exceptionList.get(0));
|
||||
}
|
||||
try {
|
||||
if (this.tryLock()) {
|
||||
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
|
||||
} else {
|
||||
// 等待锁
|
||||
waitForLock(waitLock, sessionTimeout);
|
||||
}
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
try {
|
||||
String splitStr = "_lock_";
|
||||
if (lockName.contains(splitStr)) {
|
||||
throw new LockException("锁名有误");
|
||||
}
|
||||
// 创建临时有序节点
|
||||
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
|
||||
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
// 取所有子节点
|
||||
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
|
||||
// 取出所有lockName的锁
|
||||
List<String> lockObjects = new ArrayList<String>();
|
||||
for (String node : subNodes) {
|
||||
String tmpNode = node.split(splitStr)[0];
|
||||
if (tmpNode.equals(lockName)) {
|
||||
lockObjects.add(node);
|
||||
}
|
||||
}
|
||||
Collections.sort(lockObjects);
|
||||
// 若当前节点为最小节点,则获取锁成功
|
||||
if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
|
||||
return true;
|
||||
}
|
||||
// 若不是最小节点,则找到自己的前一个节点
|
||||
String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
|
||||
waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean tryLock(long timeout, TimeUnit unit) {
|
||||
try {
|
||||
if (this.tryLock()) {
|
||||
return true;
|
||||
}
|
||||
return waitForLock(waitLock, timeout);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// 等待锁
|
||||
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
|
||||
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
|
||||
|
||||
if (stat != null) {
|
||||
this.countDownLatch = new CountDownLatch(1);
|
||||
// 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
|
||||
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
|
||||
this.countDownLatch = null;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
try {
|
||||
zk.delete(currentLock, -1);
|
||||
currentLock = null;
|
||||
zk.close();
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Condition newCondition() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
this.lock();
|
||||
}
|
||||
|
||||
|
||||
public class LockException extends RuntimeException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public LockException(String e) {
|
||||
super(e);
|
||||
}
|
||||
|
||||
public LockException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
DistributedLock lock = null;
|
||||
try {
|
||||
// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
// lock.lock();
|
||||
System.out.println(SnowflakeId.generateId());
|
||||
} finally {
|
||||
if (lock != null) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Thread t = new Thread(runnable);
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -37,20 +37,26 @@ public class ZookeeperUtils implements Watcher {
|
||||
* @param path 节点路径
|
||||
*/
|
||||
public int modifyNode(String path) {
|
||||
int workerId;
|
||||
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
|
||||
int workerId = 0;
|
||||
try {
|
||||
connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
|
||||
Stat stat = zookeeper.exists(path, true);
|
||||
workerId = Integer.parseInt(getNodeDate(path));
|
||||
String result = String.valueOf(workerId + 2);
|
||||
if (stat != null) {
|
||||
zookeeper.setData(path, result.getBytes(), stat.getVersion());
|
||||
} else {
|
||||
logger.error("Node does not exist!,Can't modify");
|
||||
if (workerId > 55){
|
||||
workerId = 0;
|
||||
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
|
||||
}else {
|
||||
String result = String.valueOf(workerId + 1);
|
||||
if (stat != null) {
|
||||
zookeeper.setData(path, result.getBytes(), stat.getVersion());
|
||||
} else {
|
||||
logger.error("Node does not exist!,Can't modify");
|
||||
}
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
workerId = RandomUtils.nextInt(30, 63);
|
||||
workerId = RandomUtils.nextInt(56, 63);
|
||||
} finally {
|
||||
closeConn();
|
||||
}
|
||||
@@ -72,9 +78,14 @@ public class ZookeeperUtils implements Watcher {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭连接
|
||||
*/
|
||||
public void closeConn() {
|
||||
try {
|
||||
zookeeper.close();
|
||||
if (zookeeper != null) {
|
||||
zookeeper.close();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user