This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-platform-algorithm-s…/Zookeeper/ZookeeperUtils.java

135 lines
3.8 KiB
Java
Raw Permalink Normal View History

package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
try {
2019-08-09 16:50:12 +08:00
connectZookeeper();
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
} finally {
closeConn();
}
logger.error("工作ID是" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
*/
2019-08-09 16:50:12 +08:00
private void connectZookeeper() {
try {
2019-08-09 16:50:12 +08:00
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
private void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = new String(resByte);
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception");
e.printStackTrace();
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
try {
2019-08-09 16:50:12 +08:00
connectZookeeper();
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
} finally {
closeConn();
}
}
}