This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-platform-algorithm-s…/Zookeeper/ZookeeperUtils.java
2019-08-09 16:50:12 +08:00

135 lines
3.8 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
try {
connectZookeeper();
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
} finally {
closeConn();
}
logger.error("工作ID是" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
*/
private void connectZookeeper() {
try {
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
private void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = new String(resByte);
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception");
e.printStackTrace();
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
try {
connectZookeeper();
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
} finally {
closeConn();
}
}
}