From a5070711f6281fc686d9924ab10a5bc28fcec3bb Mon Sep 17 00:00:00 2001 From: qidaijie Date: Wed, 7 Aug 2019 16:34:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9workerID=E7=9A=84=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=96=B9=E5=BC=8F=E4=B8=BAzookeeper=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0zookeeper=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/ac/iie/bolt/NtcLogSendBolt.java | 9 + src/main/java/cn/ac/iie/bolt/SummaryBolt.java | 65 ++++++ .../ac/iie/topology/LogFlowWriteTopology.java | 2 + .../ac/iie/utils/general/TransFormUtils.java | 5 +- .../ac/iie/utils/influxdb/InfluxDbUtils.java | 29 ++- .../cn/ac/iie/utils/kafka/KafkaLogNtc.java | 4 +- .../cn/ac/iie/utils/system/SnowflakeId.java | 43 ++-- .../iie/utils/zookeeper/DistributedLock.java | 215 ++++++++++++++++++ .../iie/utils/zookeeper/ZookeeperUtils.java | 27 ++- .../java/cn/ac/iie/test/DistributedLock.java | 200 ++++++++++++++++ .../java/cn/ac/iie/test/ZookeeperTest.java | 6 +- .../iie/test/zookeeper/DistributedLock.java | 26 +++ .../java/cn/ac/iie/test/zookeeper/test.java | 5 + 13 files changed, 590 insertions(+), 46 deletions(-) create mode 100644 src/main/java/cn/ac/iie/bolt/SummaryBolt.java create mode 100644 src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java create mode 100644 src/test/java/cn/ac/iie/test/DistributedLock.java create mode 100644 src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java create mode 100644 src/test/java/cn/ac/iie/test/zookeeper/test.java diff --git a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java index dcf62a1..c19acff 100644 --- a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -1,6 +1,7 @@ package cn.ac.iie.bolt; import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.influxdb.InfluxDbUtils; import cn.ac.iie.utils.system.TupleUtils; import cn.ac.iie.utils.kafka.KafkaLogNtc; import com.zdjizhi.utils.StringUtil; @@ -9,7 +10,9 @@ import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.LinkedList; @@ -25,6 +28,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); private List list; private KafkaLogNtc kafkaLogNtc; + private static long successfulSum = 0; @Override @@ -39,8 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt { if (TupleUtils.isTick(tuple)) { if (list.size() != 0) { kafkaLogNtc.sendMessage(list); + successfulSum += list.size(); list.clear(); } + basicOutputCollector.emit(new Values(successfulSum)); + successfulSum = 0L; } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { @@ -48,6 +55,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { } if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { kafkaLogNtc.sendMessage(list); + successfulSum += list.size(); list.clear(); } } @@ -66,6 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("suc")); } } diff --git a/src/main/java/cn/ac/iie/bolt/SummaryBolt.java b/src/main/java/cn/ac/iie/bolt/SummaryBolt.java new file mode 100644 index 0000000..a3844b2 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/SummaryBolt.java @@ -0,0 +1,65 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.influxdb.InfluxDbUtils; +import cn.ac.iie.utils.system.TupleUtils; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.util.HashMap; +import java.util.Map; + +/** + * 统计总数bolt,用于将统计后的数入influxDB + * + * @author antlee + * @date 2018/8/14 + */ +public class SummaryBolt extends BaseBasicBolt { + private static final long serialVersionUID = 4614020687381536301L; + private static Logger logger = Logger.getLogger(SummaryBolt.class); + private static long sum = 0L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + InfluxDbUtils.sendKafkaSuccess(sum); + sum = 0L; + } else { + long successfulSum = tuple.getLong(0); + sum += successfulSum; + } + } catch (Exception e) { + logger.error("计数写入influxDB出现异常 ", e); + e.printStackTrace(); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60); + return conf; + } + + +} diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index 8c16b16..af9cd74 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -3,6 +3,7 @@ package cn.ac.iie.topology; import cn.ac.iie.bolt.ConnCompletionBolt; import cn.ac.iie.bolt.NtcLogSendBolt; +import cn.ac.iie.bolt.SummaryBolt; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -59,6 +60,7 @@ public class LogFlowWriteTopology { builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); + builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); } public static void main(String[] args) throws Exception { diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 2d9e9bb..57e461f 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -4,6 +4,8 @@ import cn.ac.iie.bean.SessionRecordLog; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.utils.redis.RedisPollUtils; import cn.ac.iie.utils.system.SnowflakeId; +import cn.ac.iie.utils.zookeeper.DistributedLock; +import cn.ac.iie.utils.zookeeper.ZookeeperUtils; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; @@ -29,6 +31,8 @@ public class TransFormUtils { .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") .build(); +// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); +// private static SnowflakeId snowflakeId = new SnowflakeId(); /** * 解析日志,并补全 @@ -54,7 +58,6 @@ public class TransFormUtils { logger.error("日志解析过程出现异常", e); return ""; } - } /** diff --git a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java index c16fffd..f0dfffd 100644 --- a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java +++ b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java @@ -8,6 +8,7 @@ import org.influxdb.dto.Point; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.function.LongFunction; /** * 写入influxDB工具类 @@ -19,14 +20,29 @@ public class InfluxDbUtils { /** * 原始日志写入数据中心kafka失败标识 */ - public static void sendKafkaFail() { + public static void sendKafkaFail(int discarded) { InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); - Point point1 = Point.measurement("sendkafkafail") + Point point1 = Point.measurement("SendKafkaFail") .tag("topology", FlowWriteConfig.KAFKA_TOPIC) .tag("hostname", getIp()) - .field("state", 1) + .field("discarded", discarded) .build(); - client.write("kafka", "", point1); + client.write("BusinessMonitor", "", point1); + } + + /** + * 原始日志写入数据中心kafka失败标识 + */ + public static void sendKafkaSuccess(Long complete) { + if (complete != 0) { + InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); + Point point1 = Point.measurement("SendKafkaSuccess") + .tag("topology", FlowWriteConfig.KAFKA_TOPIC) + .tag("hostname", getIp()) + .field("complete", complete) + .build(); + client.write("BusinessMonitor", "", point1); + } } /** @@ -45,4 +61,9 @@ public class InfluxDbUtils { } } + public static void main(String[] args) { + sendKafkaFail(100); + sendKafkaSuccess(100L); + } + } diff --git a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java index d16c79f..e83943b 100644 --- a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java @@ -54,7 +54,7 @@ public class KafkaLogNtc { } }); if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { - logger.error("超出最大容忍错误数抛弃数据条数:" + list.size()); + InfluxDbUtils.sendKafkaFail(list.size()); list.clear(); } } @@ -75,7 +75,7 @@ public class KafkaLogNtc { properties.put("request.timeout.ms", 60000); properties.put("batch.size", 262144); properties.put("buffer.memory", 33554432); -// properties.put("compression.type", "snappy"); + properties.put("compression.type", "snappy"); kafkaProducer = new KafkaProducer<>(properties); } diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java index 6f11ddd..e697202 100644 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java @@ -1,11 +1,9 @@ package cn.ac.iie.utils.system; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.kafka.KafkaLogNtc; -import cn.ac.iie.utils.redis.RedisPollUtils; +import cn.ac.iie.utils.zookeeper.DistributedLock; import cn.ac.iie.utils.zookeeper.ZookeeperUtils; import org.apache.log4j.Logger; -import org.apache.zookeeper.ZooDefs; /** * 雪花算法 @@ -86,33 +84,32 @@ public class SnowflakeId { */ private long lastTimestamp = -1L; + private static SnowflakeId idWorker; + private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + static { - ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - zookeeperUtils.createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); -// idWorker = new SnowflakeId(RedisPollUtils.getWorkerId(FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM); - idWorker = new SnowflakeId(zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM); -// idWorker = new SnowflakeId(12, FlowWriteConfig.DATA_CENTER_ID_NUM); + idWorker = new SnowflakeId(); } //==============================Constructors===================================== /** * 构造函数 - * - * @param workerId 工作ID (0~63) - * @param dataCenterId 数据中心ID (0~15) */ - private SnowflakeId(int workerId, int dataCenterId) { - if (workerId > maxWorkerId || workerId < 0) { - throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId)); + private SnowflakeId() { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } - if (dataCenterId > maxDataCenterId || dataCenterId < 0) { - throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId)); + if (FlowWriteConfig.DATA_CENTER_ID_NUM > maxDataCenterId || FlowWriteConfig.DATA_CENTER_ID_NUM < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); } - this.workerId = workerId; - this.dataCenterId = dataCenterId; + this.workerId = tmpWorkerId; + this.dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; } // ==============================Methods========================================== @@ -188,15 +185,5 @@ public class SnowflakeId { return idWorker.nextId(); } - public static void main(String[] args) { - while (true) { - try { - logger.info(SnowflakeId.generateId()); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java new file mode 100644 index 0000000..e57b0a5 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java @@ -0,0 +1,215 @@ +package cn.ac.iie.utils.zookeeper; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.SnowflakeId; +import org.apache.log4j.Logger; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * @author qidaijie + */ +public class DistributedLock implements Lock, Watcher { + private static Logger logger = Logger.getLogger(DistributedLock.class); + + private ZooKeeper zk = null; + /** + * 根节点 + */ + private final String ROOT_LOCK = "/locks"; + /** + * 竞争的资源 + */ + private String lockName; + /** + * 等待的前一个锁 + */ + private String waitLock; + /** + * 当前锁 + */ + private String currentLock; + /** + * 计数器 + */ + private CountDownLatch countDownLatch; + + private int sessionTimeout = 2000; + + private List exceptionList = new ArrayList(); + + /** + * 配置分布式锁 + * + * @param config 连接的url + * @param lockName 竞争资源 + */ + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + try { + // 连接zookeeper + zk = new ZooKeeper(config, sessionTimeout, this); + Stat stat = zk.exists(ROOT_LOCK, false); + if (stat == null) { + // 如果根节点不存在,则创建根节点 + zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + logger.error("Node already exists!"); + } + } + + // 节点监视器 + @Override + public void process(WatchedEvent event) { + if (this.countDownLatch != null) { + this.countDownLatch.countDown(); + } + } + + @Override + public void lock() { + if (exceptionList.size() > 0) { + throw new LockException(exceptionList.get(0)); + } + try { + if (this.tryLock()) { + System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + } else { + // 等待锁 + waitForLock(waitLock, sessionTimeout); + } + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + } + + @Override + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) { + throw new LockException("锁名有误"); + } + // 创建临时有序节点 + currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + // 取所有子节点 + List subNodes = zk.getChildren(ROOT_LOCK, false); + // 取出所有lockName的锁 + List lockObjects = new ArrayList(); + for (String node : subNodes) { + String tmpNode = node.split(splitStr)[0]; + if (tmpNode.equals(lockName)) { + lockObjects.add(node); + } + } + Collections.sort(lockObjects); + // 若当前节点为最小节点,则获取锁成功 + if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { + return true; + } + // 若不是最小节点,则找到自己的前一个节点 + String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); + waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + return false; + } + + + @Override + public boolean tryLock(long timeout, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitLock, timeout); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + // 等待锁 + private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { + Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); + + if (stat != null) { + this.countDownLatch = new CountDownLatch(1); + // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 + this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); + this.countDownLatch = null; + } + return true; + } + + @Override + public void unlock() { + try { + zk.delete(currentLock, -1); + currentLock = null; + zk.close(); + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + } + + @Override + public Condition newCondition() { + return null; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } + + public static void main(String[] args) { + ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + DistributedLock lock = null; + try { +// lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); +// lock.lock(); + System.out.println(SnowflakeId.generateId()); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + }; + + for (int i = 0; i < 10; i++) { + Thread t = new Thread(runnable); + t.start(); + } + } +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java index 5769144..be6a74a 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java @@ -37,20 +37,26 @@ public class ZookeeperUtils implements Watcher { * @param path 节点路径 */ public int modifyNode(String path) { - int workerId; + createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); + int workerId = 0; try { connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS); Stat stat = zookeeper.exists(path, true); workerId = Integer.parseInt(getNodeDate(path)); - String result = String.valueOf(workerId + 2); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); + if (workerId > 55){ + workerId = 0; + zookeeper.setData(path, "1".getBytes(), stat.getVersion()); + }else { + String result = String.valueOf(workerId + 1); + if (stat != null) { + zookeeper.setData(path, result.getBytes(), stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); - workerId = RandomUtils.nextInt(30, 63); + workerId = RandomUtils.nextInt(56, 63); } finally { closeConn(); } @@ -72,9 +78,14 @@ public class ZookeeperUtils implements Watcher { } } + /** + * 关闭连接 + */ public void closeConn() { try { - zookeeper.close(); + if (zookeeper != null) { + zookeeper.close(); + } } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/src/test/java/cn/ac/iie/test/DistributedLock.java b/src/test/java/cn/ac/iie/test/DistributedLock.java new file mode 100644 index 0000000..030bf4a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/DistributedLock.java @@ -0,0 +1,200 @@ +package cn.ac.iie.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.zookeeper.ZookeeperUtils; +import org.apache.log4j.Logger; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +public class DistributedLock implements Lock, Watcher { + private static Logger logger = Logger.getLogger(DistributedLock.class); + + private ZooKeeper zk = null; + // 根节点 + private String ROOT_LOCK = "/locks"; + // 竞争的资源 + private String lockName; + // 等待的前一个锁 + private String WAIT_LOCK; + // 当前锁 + private String CURRENT_LOCK; + // 计数器 + private CountDownLatch countDownLatch; + private int sessionTimeout = 30000; + private List exceptionList = new ArrayList(); + + /** + * 配置分布式锁 + * + * @param config 连接的url + * @param lockName 竞争资源 + */ + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + try { + // 连接zookeeper + zk = new ZooKeeper(config, sessionTimeout, this); + Stat stat = zk.exists(ROOT_LOCK, false); + if (stat == null) { + // 如果根节点不存在,则创建根节点 + zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + logger.error("Node already exists!"); + } + } + + // 节点监视器 + public void process(WatchedEvent event) { + if (this.countDownLatch != null) { + this.countDownLatch.countDown(); + } + } + + public void lock() { + if (exceptionList.size() > 0) { + throw new LockException(exceptionList.get(0)); + } + try { + if (this.tryLock()) { +// System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); +// ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); +// zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + } else { + // 等待锁 + waitForLock(WAIT_LOCK, sessionTimeout); + } + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + } + + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) { + throw new LockException("锁名有误"); + } + // 创建临时有序节点 + CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); +// System.out.println(CURRENT_LOCK + " 已经创建"); + // 取所有子节点 + List subNodes = zk.getChildren(ROOT_LOCK, false); + // 取出所有lockName的锁 + List lockObjects = new ArrayList(); + for (String node : subNodes) { + String _node = node.split(splitStr)[0]; + if (_node.equals(lockName)) { + lockObjects.add(node); + } + } + Collections.sort(lockObjects); +// System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK); + // 若当前节点为最小节点,则获取锁成功 + if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { + return true; + } + // 若不是最小节点,则找到自己的前一个节点 + String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1); + WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + return false; + } + + + @Override + public boolean tryLock(long timeout, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(WAIT_LOCK, timeout); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + // 等待锁 + private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { + Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); + + if (stat != null) { +// System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev); + this.countDownLatch = new CountDownLatch(1); + // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 + this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); + this.countDownLatch = null; +// System.out.println(Thread.currentThread().getName() + " 等到了锁"); + } + return true; + } + + public void unlock() { + try { +// System.out.println("释放锁 " + CURRENT_LOCK); + zk.delete(CURRENT_LOCK, -1); + CURRENT_LOCK = null; + zk.close(); + } catch (InterruptedException | KeeperException e) { + e.printStackTrace(); + } + } + + public Condition newCondition() { + return null; + } + + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } + + public static void main(String[] args) { + ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + Runnable runnable = new Runnable() { + public void run() { + DistributedLock lock = null; + try { + lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); + zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + }; + + for (int i = 0; i < 10; i++) { + Thread t = new Thread(runnable); + t.start(); + } + } +} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/ZookeeperTest.java b/src/test/java/cn/ac/iie/test/ZookeeperTest.java index fe4e2b3..2cb6c9f 100644 --- a/src/test/java/cn/ac/iie/test/ZookeeperTest.java +++ b/src/test/java/cn/ac/iie/test/ZookeeperTest.java @@ -113,10 +113,10 @@ public class ZookeeperTest implements Watcher { public static void main(String[] args) { ZookeeperTest zookeeperTest = new ZookeeperTest(); try { - zookeeperTest.connectZookeeper("192.168.40.207:2181"); - zookeeperTest.createNode("/Snowflake", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); + zookeeperTest.connectZookeeper("192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181"); +// zookeeperTest.createNode("/Snowflake", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); // System.out.println(zookeeperTest.getNodeDate("/testNode/UID-TEST")); -// zookeeperTest.modifyNode("/testNode/UID-TEST", "2".getBytes()); + zookeeperTest.modifyNode("/Snowflake/SESSION-TEST-LOG", "0".getBytes()); // System.out.println(zookeeperTest.getNodeDate("/testNode/UID-TEST")); } catch (Exception e) { diff --git a/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java new file mode 100644 index 0000000..f442ebe --- /dev/null +++ b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java @@ -0,0 +1,26 @@ +package cn.ac.iie.test.zookeeper; + +import java.util.concurrent.TimeUnit; + +public interface DistributedLock { + /** + * 获取锁,如果没有得到就等待 + */ + public void acquire() throws Exception; + + /** + * 获取锁,直到超时 + * + * @param unit time参数的单位 + * @throws Exception + * @return是否获取到锁 + */ + public boolean acquire(long time, TimeUnit unit) throws Exception; + + /** + * 释放锁 + * + * @throws Exception + */ + public void release() throws Exception; +} diff --git a/src/test/java/cn/ac/iie/test/zookeeper/test.java b/src/test/java/cn/ac/iie/test/zookeeper/test.java new file mode 100644 index 0000000..b7cb523 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/zookeeper/test.java @@ -0,0 +1,5 @@ +package cn.ac.iie.test.zookeeper; + +public class test { + +}