提交 雪花算法代码,zookeeper代码

This commit is contained in:
qidaijie
2019-08-09 16:31:25 +08:00
parent b765ffee7b
commit d2e310cda0
3 changed files with 539 additions and 0 deletions

189
SnowFlake/SnowflakeId.java Normal file
View File

@@ -0,0 +1,189 @@
package cn.ac.iie.utils.system;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import org.apache.log4j.Logger;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static Logger logger = Logger.getLogger(SnowflakeId.class);
// ==============================Fields===========================================
/**
* 开始时间截 (2018-08-01 00:00:00) max 17years
*/
private final long twepoch = 1564588800000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 6L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 4L;
/**
* 支持的最大机器id结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是15
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 14L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为16383
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~63)
*/
private long workerId;
/**
* 数据中心ID(0~15)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~16383)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId();
}
//==============================Constructors=====================================
/**
* 构造函数
*/
private SnowflakeId() {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId() {
return idWorker.nextId();
}
}

View File

@@ -0,0 +1,215 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.SnowflakeId;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author qidaijie
*/
public class DistributedLock implements Lock, Watcher {
private static Logger logger = Logger.getLogger(DistributedLock.class);
private ZooKeeper zk = null;
/**
* 根节点
*/
private final String ROOT_LOCK = "/locks";
/**
* 竞争的资源
*/
private String lockName;
/**
* 等待的前一个锁
*/
private String waitLock;
/**
* 当前锁
*/
private String currentLock;
/**
* 计数器
*/
private CountDownLatch countDownLatch;
private int sessionTimeout = 2000;
private List<Exception> exceptionList = new ArrayList<Exception>();
/**
* 配置分布式锁
*
* @param config 连接的url
* @param lockName 竞争资源
*/
public DistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
// 连接zookeeper
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
// 如果根节点不存在,则创建根节点
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException | InterruptedException | KeeperException e) {
logger.error("Node already exists!");
}
}
// 节点监视器
@Override
public void process(WatchedEvent event) {
if (this.countDownLatch != null) {
this.countDownLatch.countDown();
}
}
@Override
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
} else {
// 等待锁
waitForLock(waitLock, sessionTimeout);
}
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
@Override
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("锁名有误");
}
// 创建临时有序节点
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 取所有子节点
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
// 取出所有lockName的锁
List<String> lockObjects = new ArrayList<String>();
for (String node : subNodes) {
String tmpNode = node.split(splitStr)[0];
if (tmpNode.equals(lockName)) {
lockObjects.add(node);
}
}
Collections.sort(lockObjects);
// 若当前节点为最小节点,则获取锁成功
if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
// 若不是最小节点,则找到自己的前一个节点
String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(waitLock, timeout);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 等待锁
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
this.countDownLatch = new CountDownLatch(1);
// 计数等待若等到前一个节点消失则precess中进行countDown停止等待获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
}
return true;
}
@Override
public void unlock() {
try {
zk.delete(currentLock, -1);
currentLock = null;
zk.close();
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
public static void main(String[] args) {
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
Runnable runnable = new Runnable() {
@Override
public void run() {
DistributedLock lock = null;
try {
// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
// lock.lock();
System.out.println(SnowflakeId.generateId());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}

View File

@@ -0,0 +1,135 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
try {
connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
} finally {
closeConn();
}
logger.error("工作ID是" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
private void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
private void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = new String(resByte);
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception");
e.printStackTrace();
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
try {
connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
} finally {
closeConn();
}
}
}