3.0 基础版本
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.hbase.HbaseUtils;
|
||||
import cn.ac.iie.utils.system.TupleUtils;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
@@ -10,6 +13,7 @@ import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage;
|
||||
@@ -31,15 +35,28 @@ public class ConnCompletionBolt extends BaseBasicBolt {
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
basicOutputCollector.emit(new Values(getJsonMessage(message)));
|
||||
if (TupleUtils.isTick(tuple)) {
|
||||
// HbaseUtils.change();
|
||||
} else {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
basicOutputCollector.emit(new Values(getJsonMessage(message)));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("接收解析过程出现异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
Map<String, Object> conf = new HashMap<String, Object>(16);
|
||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
|
||||
FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("connLog"));
|
||||
|
||||
@@ -60,7 +60,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("日志发送Kafka过程出现异常 ", e);
|
||||
logger.error("日志发送Kafka过程出现异常");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package cn.ac.iie.bolt.radius;
|
||||
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.ac.iie.utils.general.TransFormUtils.getRadiusMessage;
|
||||
|
||||
/**
|
||||
* 通联关系日志补全
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class RadiusCompletionBolt extends BaseBasicBolt {
|
||||
private static final long serialVersionUID = -3657802387129063952L;
|
||||
private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class);
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
basicOutputCollector.emit(new Values(getRadiusMessage(message)));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("接收解析过程出现异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("connLog"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -40,6 +40,8 @@ public class FlowWriteConfig {
|
||||
*/
|
||||
public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
|
||||
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
|
||||
public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
|
||||
public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
|
||||
|
||||
@@ -3,7 +3,7 @@ package cn.ac.iie.topology;
|
||||
|
||||
import cn.ac.iie.bolt.ConnCompletionBolt;
|
||||
import cn.ac.iie.bolt.NtcLogSendBolt;
|
||||
import cn.ac.iie.bolt.SummaryBolt;
|
||||
import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -58,8 +58,13 @@ public class LogFlowWriteTopology {
|
||||
private void buildTopology() {
|
||||
builder = new TopologyBuilder();
|
||||
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
|
||||
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
|
||||
if ("RADIUS-LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
|
||||
builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
|
||||
} else {
|
||||
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
|
||||
}
|
||||
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package cn.ac.iie.utils.general;
|
||||
|
||||
import cn.ac.iie.bean.SessionRecordLog;
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.hbase.HbaseUtils;
|
||||
import cn.ac.iie.utils.redis.RedisPollUtils;
|
||||
import cn.ac.iie.utils.system.SnowflakeId;
|
||||
import cn.ac.iie.utils.zookeeper.DistributedLock;
|
||||
@@ -31,8 +32,6 @@ public class TransFormUtils {
|
||||
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
|
||||
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
|
||||
.build();
|
||||
// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
// private static SnowflakeId snowflakeId = new SnowflakeId();
|
||||
|
||||
/**
|
||||
* 解析日志,并补全
|
||||
@@ -52,14 +51,40 @@ public class TransFormUtils {
|
||||
sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true));
|
||||
sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost()));
|
||||
sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000);
|
||||
// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp));
|
||||
// sessionRecordLog.setSubscribe_id(HbaseUtils.getAccount(clientIp));
|
||||
return JSONObject.toJSONString(sessionRecordLog);
|
||||
} catch (Exception e) {
|
||||
logger.error("日志解析过程出现异常", e);
|
||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 解析日志,并补全
|
||||
*
|
||||
* @param message radius原始日志
|
||||
* @return 补全后的日志
|
||||
*/
|
||||
public static String getRadiusMessage(String message) {
|
||||
SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class);
|
||||
String serverIp = sessionRecordLog.getServer_ip();
|
||||
String clientIp = sessionRecordLog.getClient_ip();
|
||||
try {
|
||||
sessionRecordLog.setUid(SnowflakeId.generateId());
|
||||
sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp));
|
||||
sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp));
|
||||
sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true));
|
||||
sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true));
|
||||
sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000);
|
||||
return JSONObject.toJSONString(sessionRecordLog);
|
||||
} catch (Exception e) {
|
||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 有sni通过sni获取域名,有hots根据host获取域名
|
||||
*
|
||||
@@ -67,7 +92,7 @@ public class TransFormUtils {
|
||||
* @param host host
|
||||
* @return 顶级域名
|
||||
*/
|
||||
private static String getTopDomain(String sni, String host) {
|
||||
public static String getTopDomain(String sni, String host) {
|
||||
if (StringUtil.isNotBlank(sni)) {
|
||||
return getDomain(sni);
|
||||
} else if (StringUtil.isNotBlank(host)) {
|
||||
|
||||
124
src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java
Normal file
124
src/main/java/cn/ac/iie/utils/hbase/HbaseUtils.java
Normal file
@@ -0,0 +1,124 @@
|
||||
package cn.ac.iie.utils.hbase;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class HbaseUtils {
|
||||
private final static Logger logger = Logger.getLogger(HbaseUtils.class);
|
||||
private static Map<String, String> subIdMap = new HashMap<>(16);
|
||||
private static Connection connection;
|
||||
private static Long time;
|
||||
|
||||
// static {
|
||||
// // 管理Hbase的配置信息
|
||||
// Configuration configuration = HBaseConfiguration.create();
|
||||
// // 设置zookeeper节点
|
||||
// configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
|
||||
// try {
|
||||
// connection = ConnectionFactory.createConnection(configuration);
|
||||
// time = System.currentTimeMillis();
|
||||
// getAll();
|
||||
// } catch (IOException e) {
|
||||
// logger.error("获取HBase连接失败");
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
// }
|
||||
|
||||
/**
|
||||
* 更新变量
|
||||
*/
|
||||
public static void change() {
|
||||
Long nowTime = System.currentTimeMillis();
|
||||
timestampsFilter(time - 1500, nowTime + 500);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取变更内容
|
||||
*
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
*/
|
||||
private static void timestampsFilter(Long startTime, Long endTime) {
|
||||
Table table = null;
|
||||
TableName tableName = TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME);
|
||||
Admin admin = null;
|
||||
try {
|
||||
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
|
||||
Scan scan2 = new Scan();
|
||||
scan2.setTimeRange(startTime, endTime);
|
||||
ResultScanner scanner = table.getScanner(scan2);
|
||||
for (Result result : scanner) {
|
||||
Cell[] cells = result.rawCells();
|
||||
for (Cell cell : cells) {
|
||||
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
|
||||
}
|
||||
}
|
||||
admin = connection.getAdmin();
|
||||
admin.flush(tableName);
|
||||
logger.warn("当前集合长度" + subIdMap.keySet().size());
|
||||
logger.warn("更新后集合keys:" + subIdMap.keySet());
|
||||
time = endTime;
|
||||
scanner.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
if (table != null) {
|
||||
try {
|
||||
table.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有的 key value
|
||||
*/
|
||||
private static void getAll() {
|
||||
try {
|
||||
Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
|
||||
Scan scan2 = new Scan();
|
||||
ResultScanner scanner = table.getScanner(scan2);
|
||||
for (Result result : scanner) {
|
||||
Cell[] cells = result.rawCells();
|
||||
for (Cell cell : cells) {
|
||||
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
|
||||
}
|
||||
}
|
||||
logger.warn("获取全量后集合keys:" + subIdMap.keySet());
|
||||
scanner.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 account
|
||||
*
|
||||
* @param ip
|
||||
* @return
|
||||
*/
|
||||
public static String getAccount(String ip) {
|
||||
if (StringUtil.isNotBlank(ip)) {
|
||||
return subIdMap.get(ip);
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,7 +59,7 @@ public class KafkaLogNtc {
|
||||
}
|
||||
}
|
||||
kafkaProducer.flush();
|
||||
logger.warn("Log sent to National Center successfully!!!!!");
|
||||
logger.debug("Log sent to National Center successfully!!!!!");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -75,7 +75,7 @@ public class KafkaLogNtc {
|
||||
properties.put("request.timeout.ms", 60000);
|
||||
properties.put("batch.size", 262144);
|
||||
properties.put("buffer.memory", 33554432);
|
||||
properties.put("compression.type", "snappy");
|
||||
// properties.put("compression.type", "snappy");
|
||||
kafkaProducer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ public class RedisPollUtils {
|
||||
poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW)));
|
||||
// 根据配置实例化jedis池
|
||||
jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP),
|
||||
Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT)));
|
||||
Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT)), 3000, "123456");
|
||||
} catch (Exception e) {
|
||||
logger.error("Redis连接池初始化错误", e);
|
||||
}
|
||||
@@ -96,19 +96,15 @@ public class RedisPollUtils {
|
||||
// return workId;
|
||||
// }
|
||||
|
||||
public static Integer getWorkerId(String key) {
|
||||
int workId = 0;
|
||||
public static String getWorkerId(String key) {
|
||||
String sub = "";
|
||||
try (Jedis jedis = RedisPollUtils.getJedis()) {
|
||||
if (jedis != null) {
|
||||
workId = Integer.parseInt(jedis.get(key));
|
||||
jedis.set(key, String.valueOf(workId + 2));
|
||||
logger.error("\n工作id是:" + workId + "\n");
|
||||
}
|
||||
sub = jedis.get(key);
|
||||
} catch (Exception e) {
|
||||
logger.error("通过Redis获取用户名出现异常", e);
|
||||
workId = RandomUtils.nextInt(0, 31);
|
||||
|
||||
}
|
||||
return workId;
|
||||
return sub;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package cn.ac.iie.utils.system;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.zookeeper.DistributedLock;
|
||||
import cn.ac.iie.utils.zookeeper.ZooKeeperLock;
|
||||
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -98,19 +99,40 @@ public class SnowflakeId {
|
||||
/**
|
||||
* 构造函数
|
||||
*/
|
||||
// private SnowflakeId() {
|
||||
// DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
// lock.lock();
|
||||
// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
|
||||
// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
||||
// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
||||
// }
|
||||
// int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
|
||||
// if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
|
||||
// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
|
||||
// }
|
||||
// this.workerId = tmpWorkerId;
|
||||
// this.dataCenterId = dataCenterId;
|
||||
// }
|
||||
|
||||
private SnowflakeId() {
|
||||
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
lock.lock();
|
||||
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
|
||||
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
||||
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
||||
ZooKeeperLock lock = new ZooKeeperLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "/locks", "disLocks");
|
||||
if (lock.lock()) {
|
||||
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
|
||||
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
||||
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
||||
}
|
||||
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
|
||||
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
|
||||
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
|
||||
}
|
||||
this.workerId = tmpWorkerId;
|
||||
this.dataCenterId = dataCenterId;
|
||||
try {
|
||||
lock.unlock();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
|
||||
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
|
||||
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
|
||||
}
|
||||
this.workerId = tmpWorkerId;
|
||||
this.dataCenterId = dataCenterId;
|
||||
}
|
||||
|
||||
// ==============================Methods==========================================
|
||||
|
||||
@@ -190,27 +190,14 @@ public class DistributedLock implements Lock, Watcher {
|
||||
|
||||
public static void main(String[] args) {
|
||||
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
||||
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
DistributedLock lock = null;
|
||||
try {
|
||||
lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
lock.lock();
|
||||
// System.out.println(SnowflakeId.generateId());
|
||||
System.out.println(1);
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
if (lock != null) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
|
||||
lock.lock();
|
||||
lock.unlock();
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Thread t = new Thread(runnable);
|
||||
t.start();
|
||||
|
||||
170
src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java
Normal file
170
src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java
Normal file
@@ -0,0 +1,170 @@
|
||||
package cn.ac.iie.utils.zookeeper;
|
||||
|
||||
import org.apache.zookeeper.*;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ZooKeeperLock implements Watcher {
|
||||
private ZooKeeper zk = null;
|
||||
private String rootLockNode; // 锁的根节点
|
||||
private String lockName; // 竞争资源,用来生成子节点名称
|
||||
private String currentLock; // 当前锁
|
||||
private String waitLock; // 等待的锁(前一个锁)
|
||||
private CountDownLatch countDownLatch; // 计数器(用来在加锁失败时阻塞加锁线程)
|
||||
private int sessionTimeout = 30000; // 超时时间
|
||||
|
||||
// 1. 构造器中创建ZK链接,创建锁的根节点
|
||||
public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
|
||||
this.rootLockNode = rootLockNode;
|
||||
this.lockName = lockName;
|
||||
try {
|
||||
// 创建连接,zkAddress格式为:IP:PORT
|
||||
zk = new ZooKeeper(zkAddress, this.sessionTimeout, this);
|
||||
// 检测锁的根节点是否存在,不存在则创建
|
||||
Stat stat = zk.exists(rootLockNode, false);
|
||||
if (null == stat) {
|
||||
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
}
|
||||
} catch (IOException | InterruptedException | KeeperException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
|
||||
public boolean lock() {
|
||||
if (this.tryLock()) {
|
||||
System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!");
|
||||
return true;
|
||||
} else {
|
||||
return waitOtherLock(this.waitLock, this.sessionTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tryLock() {
|
||||
// 分隔符
|
||||
String split = "_lock_";
|
||||
if (this.lockName.contains("_lock_")) {
|
||||
throw new RuntimeException("lockName can't contains '_lock_' ");
|
||||
}
|
||||
try {
|
||||
// 创建锁节点(临时有序节点)
|
||||
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
|
||||
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
||||
System.out.println("线程【" + Thread.currentThread().getName()
|
||||
+ "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");
|
||||
// 取所有子节点
|
||||
List<String> nodes = zk.getChildren(this.rootLockNode, false);
|
||||
// 取所有竞争lockName的锁
|
||||
List<String> lockNodes = new ArrayList<String>();
|
||||
for (String nodeName : nodes) {
|
||||
if (nodeName.split(split)[0].equals(this.lockName)) {
|
||||
lockNodes.add(nodeName);
|
||||
}
|
||||
}
|
||||
Collections.sort(lockNodes);
|
||||
// 取最小节点与当前锁节点比对加锁
|
||||
String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
|
||||
if (this.currentLock.equals(currentLockPath)) {
|
||||
return true;
|
||||
}
|
||||
// 加锁失败,设置前一节点为等待锁节点
|
||||
String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
|
||||
int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
|
||||
this.waitLock = lockNodes.get(preNodeIndex);
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean waitOtherLock(String waitLock, int sessionTimeout) {
|
||||
boolean islock = false;
|
||||
try {
|
||||
// 监听等待锁节点
|
||||
String waitLockNode = this.rootLockNode + "/" + waitLock;
|
||||
Stat stat = zk.exists(waitLockNode, true);
|
||||
if (null != stat) {
|
||||
System.out.println("线程【" + Thread.currentThread().getName()
|
||||
+ "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
|
||||
// 设置计数器,使用计数器阻塞线程
|
||||
this.countDownLatch = new CountDownLatch(1);
|
||||
islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
|
||||
this.countDownLatch = null;
|
||||
if (islock) {
|
||||
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
|
||||
+ this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
|
||||
} else {
|
||||
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
|
||||
+ this.currentLock + ")加锁失败...");
|
||||
}
|
||||
} else {
|
||||
islock = true;
|
||||
}
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return islock;
|
||||
}
|
||||
|
||||
// 3. 释放锁
|
||||
public void unlock() throws InterruptedException {
|
||||
try {
|
||||
Stat stat = zk.exists(this.currentLock, false);
|
||||
if (null != stat) {
|
||||
System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
|
||||
zk.delete(this.currentLock, -1);
|
||||
this.currentLock = null;
|
||||
}
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 监听器回调
|
||||
@Override
|
||||
public void process(WatchedEvent watchedEvent) {
|
||||
if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
|
||||
// 计数器减一,恢复线程操作
|
||||
this.countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void doSomething() {
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ZooKeeperLock lock = null;
|
||||
lock = new ZooKeeperLock("192.168.40.119:2181", "/locks", "test1");
|
||||
if (lock.lock()) {
|
||||
doSomething();
|
||||
try {
|
||||
// Thread.sleep(1000);
|
||||
lock.unlock();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Thread t = new Thread(runnable);
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user