From 475f28cabc12c91289a924267c25f6d8bd6a1346 Mon Sep 17 00:00:00 2001 From: lee Date: Wed, 12 Feb 2020 16:32:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B7=A5=E5=85=B7=E7=B1=BB=E6=89=93=E5=8C=85?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/service_flow_config.properties | 4 +- .../bolt/collect/CollectCompletedBolt.java | 2 +- .../iie/bolt/proxy/ProxyCompletionBolt.java | 5 +- .../bolt/security/SecurityCompletionBolt.java | 2 +- .../cn/ac/iie/utils/general/CompleteUtil.java | 143 ------------- .../ac/iie/utils/general/TransFormUtils.java | 40 ++-- .../cn/ac/iie/utils/hbase/HBaseUtils.java | 184 ----------------- .../cn/ac/iie/utils/system/SnowflakeId.java | 190 ------------------ .../ac/iie/utils/zookeeper/ZooKeeperLock.java | 140 ------------- .../iie/utils/zookeeper/ZookeeperUtils.java | 136 ------------- .../java/cn/ac/iie/test/DistributedLock.java | 4 +- 12 files changed, 31 insertions(+), 821 deletions(-) delete mode 100644 src/main/java/cn/ac/iie/utils/general/CompleteUtil.java delete mode 100644 src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java delete mode 100644 src/main/java/cn/ac/iie/utils/system/SnowflakeId.java delete mode 100644 src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java delete mode 100644 src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java diff --git a/pom.xml b/pom.xml index e8a81ee..273a18c 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ com.zdjizhi galaxy - 1.0.1 + 1.0.2 slf4j-log4j12 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 90b80e1..9336ca9 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -29,13 +29,13 @@ results.output.topic=SECURITY-EVENT-COMPLETED-LOG topology.workers=1 #spout并行度 建议与kafka分区数相同 -spout.parallelism=3 +spout.parallelism=1 #处理补全操作的bolt并行度-worker的倍数 datacenter.bolt.parallelism=1 #写入kafka的并行度10 -kafka.bolt.parallelism=3 +kafka.bolt.parallelism=1 #定位库地址 #ip.library=/home/ceiec/topology/dat/ diff --git a/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java b/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java index 5f6ab55..a7c4162 100644 --- a/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java +++ b/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java @@ -1,8 +1,8 @@ package cn.ac.iie.bolt.collect; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; diff --git a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java index 56f8f8b..a2783ee 100644 --- a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java @@ -1,8 +1,8 @@ package cn.ac.iie.bolt.proxy; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; @@ -17,7 +17,6 @@ import java.util.HashMap; import java.util.Map; import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; -import static cn.ac.iie.utils.hbase.HBaseUtils.change; /** * 通联关系日志补全 @@ -38,7 +37,7 @@ public class ProxyCompletionBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { if (TupleUtils.isTick(tuple)) { - change(); + HBaseUtils.change(); } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { diff --git a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java index 7a4182e..09cd305 100644 --- a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java @@ -1,8 +1,8 @@ package cn.ac.iie.bolt.security; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; diff --git a/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java b/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java deleted file mode 100644 index b786a4a..0000000 --- a/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java +++ /dev/null @@ -1,143 +0,0 @@ -package cn.ac.iie.utils.general; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; -import cn.ac.iie.utils.system.SnowflakeId; -import com.google.common.net.InternetDomainName; -import com.zdjizhi.utils.IpLookup; -import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; - -import java.util.Base64; - - -public final class CompleteUtil { - - private static Logger logger = Logger.getLogger(TransFormUtils.class); - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") - .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") - .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") - .build(); - - /** - * 有sni通过sni获取域名,有host根据host获取域名 - * - * @param sni sni - * @param host host - * @return 顶级域名 - */ - public static String getTopDomain(String sni, String host) { - if (StringUtil.isNotBlank(host)) { - return getDomainName(host); - } else if (StringUtil.isNotBlank(sni)) { - return getDomainName(sni); - } else { - return ""; - } - } - - - /** - * 根据url截取顶级域名 - * - * @param host 网站url - * @return 顶级域名 - */ - public static String getDomainName(String host) { - String domain = ""; - try { - domain = InternetDomainName.from(host).topPrivateDomain().toString(); - } catch (Exception e) { - logger.error("host解析顶级域名异常: " + e.getMessage()); - } - return domain; - } - - /** - * 生成当前时间戳的操作 - */ - public static int getCurrentTime() { - return (int)(System.currentTimeMillis() / 1000); - } - - /** - * 雪花模型生成id - * - * @return - */ - public static long getSnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { - - return SnowflakeId.generateId(zookeeperIp,kafkaTopic,dataCenterIdNum); - } - - /** - * 根据clientIp获取location信息 - * - * @param ip - * @return - */ - public static String getGeoIpDetail(String ip) { - - return ipLookup.cityLookupDetail(ip); - } - - /** - * 根据ip获取asn信息 - * - * @param ip - * @return asn - */ - public static String getGeoAsn(String ip) { - - return ipLookup.asnLookup(ip, true); - } - - /** - * 根据ip获取country信息 - * - * @param ip - * @return country - */ - public static String getGeoIpCountry(String ip) { - - return ipLookup.countryLookup(ip); - } - - /** - * 根据ip去hbase中匹配对应的用户名 - * @param clientIp - * @param hbaseZookeeper - * @param hbaseTable - * @return 用户名 subscriber_id - */ - public static String radiusMatch(String clientIp, String hbaseZookeeper, String hbaseTable) { - return HBaseUtils.getAccount(clientIp,hbaseZookeeper,hbaseTable); - } - - - /** - * base64 解码 - * - * @param encodedText mail subject - * @param subjectCharset 编码格式 - * @return 解码内容 / 空 - */ - public static String base64Str(String encodedText, String subjectCharset) { - Base64.Decoder decoder = Base64.getDecoder(); - String sub; - try { - if (StringUtil.isNotBlank(subjectCharset)) { - sub = new String(decoder.decode(encodedText), subjectCharset); - } else { - sub = new String(decoder.decode(encodedText), "UTF-8"); - } - return sub; - } catch (Exception e) { - logger.error("transform base64 String failed! base64Str = " + encodedText + "Charset = " + subjectCharset + "error :" + e); - return ""; - } - } - -} diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index fb028eb..572cf78 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -2,20 +2,16 @@ package cn.ac.iie.utils.general; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.json.JsonParseUtil; -import cn.ac.iie.utils.system.SnowflakeId; import com.alibaba.fastjson.JSONObject; -import com.google.common.net.InternetDomainName; +import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; import java.util.*; -import java.util.regex.Pattern; -import static cn.ac.iie.utils.general.CompleteUtil.*; /** @@ -28,6 +24,16 @@ import static cn.ac.iie.utils.general.CompleteUtil.*; public class TransFormUtils { private static Logger logger = Logger.getLogger(TransFormUtils.class); +// private static IpLookup ipLookup = FormatUtils.getIpLookup(FlowWriteConfig.IP_LIBRARY); + + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + //在内存中加载反射类用的map private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); //反射成一个类 @@ -43,8 +49,6 @@ public class TransFormUtils { */ public static String dealCommonMessage(String message) { - - Object object = JSONObject.parseObject(message, mapObject.getClass()); // System.out.println("补全之前 ===》 "+JSON.toJSONString(object)); @@ -64,26 +68,26 @@ public class TransFormUtils { if (function.equals("current_timestamp")) { - JsonParseUtil.setValue(object, strings[1], getCurrentTime()); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getCurrentTime()); } else if (function.equals("snowflake_id")) { - JsonParseUtil.setValue(object, strings[1], getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM)); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM)); } else if (function.equals("geo_ip_detail")) { - JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString())); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getGeoIpDetail(name.toString(),ipLookup)); } else if (function.equals("geo_asn")) { - JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString())); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getGeoAsn(name.toString(),ipLookup)); } else if (function.equals("radius_match")) { - JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString(),FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS,FlowWriteConfig.HBASE_TABLE_NAME)); + JsonParseUtil.setValue(object, strings[1], FormatUtils.radiusMatch(name.toString(),FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS,FlowWriteConfig.HBASE_TABLE_NAME)); } else if (function.equals("geo_ip_country")) { - JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString())); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getGeoIpCountry(name.toString(),ipLookup)); } else if (function.equals("decode_of_base64") && param != null){ - JsonParseUtil.setValue(object, strings[1], base64Str(name.toString(),param.toString())); + JsonParseUtil.setValue(object, strings[1], FormatUtils.base64Str(name.toString(),param.toString())); } else if (name.equals("http_host") && function.equals("sub_domain")) { if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { - JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString())); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getTopDomain(null, name.toString())); } } else if (name.equals("ssl_sni") && strings[2].equals("sub_domain")) { if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { - JsonParseUtil.setValue(object, strings[1], getTopDomain(name.toString(), null)); + JsonParseUtil.setValue(object, strings[1], FormatUtils.getTopDomain(name.toString(), null)); } } @@ -105,8 +109,8 @@ public class TransFormUtils { @Test public void aaa() { String sni = "www.baidu.com"; - System.out.println(getTopDomain(sni, null)); - System.out.println(getTopDomain(null,sni)); + System.out.println(FormatUtils.getTopDomain(sni, null)); + System.out.println(FormatUtils.getTopDomain(null,sni)); } } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java deleted file mode 100644 index 5c0e4c9..0000000 --- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java +++ /dev/null @@ -1,184 +0,0 @@ -package cn.ac.iie.utils.hbase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -public class HBaseUtils { - private final static Logger logger = Logger.getLogger(HBaseUtils.class); - private static Map subIdMap = new HashMap<>(333334); - private static Connection connection; - private static Long time; - - private static String zookeeperIp; - private static String hbaseTable; - - private static HBaseUtils hBaseUtils; - - private static void getHbaseInstance(String zookeeperServer, String hbaseTableName) { - hBaseUtils = new HBaseUtils(zookeeperServer, hbaseTableName); - } - - /** - * 构造函数-新-20191023 - */ - public HBaseUtils(String zookeeperServer, String hbaseTableName) { - zookeeperIp = zookeeperServer; - hbaseTable = hbaseTableName; - //获取连接 - getHbaseConn(); - //拉取所有 - getAll(); - //定时更新 - updateHabaseCache(); - } - - private static void getHbaseConn() { - try { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", zookeeperIp); - configuration.set("hbase.client.retries.number", "3"); - configuration.set("hbase.bulkload.retries.number", "3"); - configuration.set("zookeeper.recovery.retry", "3"); - connection = ConnectionFactory.createConnection(configuration); - time = System.currentTimeMillis(); - logger.warn("HBaseUtils get HBase connection,now to getAll()."); - } catch (IOException ioe) { - logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); - ioe.printStackTrace(); - } catch (Exception e) { - logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); - e.printStackTrace(); - } - } - - /** - * 更新变量 - */ - public static void change() { - long nowTime = System.currentTimeMillis(); - timestampsFilter(time - 1000, nowTime + 500); - } - - /** - * 获取变更内容 - * - * @param startTime 开始时间 - * @param endTime 结束时间 - */ - private static void timestampsFilter(Long startTime, Long endTime) { - Long begin = System.currentTimeMillis(); - Table table = null; - ResultScanner scanner = null; - Scan scan2 = new Scan(); - try { - table = connection.getTable(TableName.valueOf("sub:" + hbaseTable)); - scan2.setTimeRange(startTime, endTime); - scanner = table.getScanner(scan2); - for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - String key = Bytes.toString(CellUtil.cloneRow(cell)); - String value = Bytes.toString(CellUtil.cloneValue(cell)); - if (subIdMap.containsKey(key)) { - if (!value.equals(subIdMap.get(key))) { - subIdMap.put(key, value); - } - } else { - subIdMap.put(key, value); - } - } - } - Long end = System.currentTimeMillis(); - logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); - logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end); - time = endTime; - } catch (IOException ioe) { - logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); - ioe.printStackTrace(); - } catch (Exception e) { - logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); - e.printStackTrace(); - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - try { - table.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - /** - * 获取所有的 key value - */ - private static void getAll() { - long begin = System.currentTimeMillis(); - try { - Table table = connection.getTable(TableName.valueOf("sub:" + hbaseTable)); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); - } - } - logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); - logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); - scanner.close(); - } catch (IOException ioe) { - logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); - ioe.printStackTrace(); - } catch (Exception e) { - logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); - e.printStackTrace(); - } - } - - /** - * 获取 account - * - * @param clientIp client_ip - * @return account - */ - public static String getAccount(String clientIp, String hbaseZookeeper, String hbaseTable) { - if (hBaseUtils == null) { - getHbaseInstance(hbaseZookeeper, hbaseTable); - } - return subIdMap.get(clientIp); - } - - private void updateHabaseCache() { - Timer timer = new Timer(); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - change(); - } catch (Exception e) { - logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); - e.printStackTrace(); - } - } - }, 1, 1000 * 60); - } - -} diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java deleted file mode 100644 index 8d67348..0000000 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ /dev/null @@ -1,190 +0,0 @@ -package cn.ac.iie.utils.system; - -import cn.ac.iie.utils.zookeeper.ZooKeeperLock; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; -import org.apache.log4j.Logger; - - -public class SnowflakeId { - private static Logger logger = Logger.getLogger(SnowflakeId.class); - - /** - * 开始时间截 (2018-08-01 00:00:00) max 17years - */ - private final long twepoch = 1564588800000L; - - /** - * 机器id所占的位数 - */ - private final long workerIdBits = 6L; - - /** - * 数据标识id所占的位数 - */ - private final long dataCenterIdBits = 4L; - - /** - * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) - * M << n = M * 2^n - */ - private final long maxWorkerId = -1L ^ (-1L << workerIdBits); - - /** - * 支持的最大数据标识id,结果是15 - */ - private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); - - /** - * 序列在id中占的位数 - */ - private final long sequenceBits = 14L; - - /** - * 机器ID向左移12位 - */ - private final long workerIdShift = sequenceBits; - - /** - * 数据标识id向左移17位(14+6) - */ - private final long dataCenterIdShift = sequenceBits + workerIdBits; - - /** - * 时间截向左移22位(4+6+14) - */ - private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; - - /** - * 生成序列的掩码,这里为16383 - */ - private final long sequenceMask = -1L ^ (-1L << sequenceBits); - - /** - * 工作机器ID(0~63) - */ - private long workerId; - - /** - * 数据中心ID(0~15) - */ - private long dataCenterId; - - /** - * 毫秒内序列(0~16383) - */ - private long sequence = 0L; - - /** - * 上次生成ID的时间截 - */ - private long lastTimestamp = -1L; - - - private static SnowflakeId idWorker; - - private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - - private static void getSnowflakeldInstance(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { - idWorker = new SnowflakeId(zookeeperIp, kafkaTopic, dataCenterIdNum); - } - - /** - * 构造函数 - */ - private SnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { - ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1"); - if (lock.lock()) { - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp); - if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { - throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); - } - if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); - } - this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterIdNum; - try { - lock.unlock(); - } catch (InterruptedException ie) { - ie.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - logger.error("This is not usual error!!!===>>>" + e + "<<<==="); - } - } - } - - /** - * 获得下一个ID (该方法是线程安全的) - * - * @return SnowflakeId - */ - private synchronized long nextId() { - long timestamp = timeGen(); - - //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 - if (timestamp < lastTimestamp) { - throw new RuntimeException( - String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); - } - - //如果是同一时间生成的,则进行毫秒内序列 - if (lastTimestamp == timestamp) { - sequence = (sequence + 1) & sequenceMask; - //毫秒内序列溢出 - if (sequence == 0) { - //阻塞到下一个毫秒,获得新的时间戳 - timestamp = tilNextMillis(lastTimestamp); - } - } - //时间戳改变,毫秒内序列重置 - else { - sequence = 0L; - } - - //上次生成ID的时间截 - lastTimestamp = timestamp; - - //移位并通过或运算拼到一起组成64位的ID - return ((timestamp - twepoch) << timestampLeftShift) - | (dataCenterId << dataCenterIdShift) - | (workerId << workerIdShift) - | sequence; - } - - /** - * 阻塞到下一个毫秒,直到获得新的时间戳 - * - * @param lastTimestamp 上次生成ID的时间截 - * @return 当前时间戳 - */ - protected long tilNextMillis(long lastTimestamp) { - long timestamp = timeGen(); - while (timestamp <= lastTimestamp) { - timestamp = timeGen(); - } - return timestamp; - } - - /** - * 返回以毫秒为单位的当前时间 - * - * @return 当前时间(毫秒) - */ - protected long timeGen() { - return System.currentTimeMillis(); - } - - - /** - * 静态工具类 - * - * @return - */ - public static Long generateId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { - if (idWorker == null) { - getSnowflakeldInstance(zookeeperIp, kafkaTopic, dataCenterIdNum); - } - return idWorker.nextId(); - } -} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java deleted file mode 100644 index 8d0a4d5..0000000 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java +++ /dev/null @@ -1,140 +0,0 @@ -package cn.ac.iie.utils.zookeeper; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class ZooKeeperLock implements Watcher { - private static Logger logger = Logger.getLogger(ZooKeeperLock.class); - - private ZooKeeper zk = null; - private String rootLockNode; // 锁的根节点 - private String lockName; // 竞争资源,用来生成子节点名称 - private String currentLock; // 当前锁 - private String waitLock; // 等待的锁(前一个锁) - private CountDownLatch countDownLatch; // 计数器(用来在加锁失败时阻塞加锁线程) - private int sessionTimeout = 30000; // 超时时间 - - // 1. 构造器中创建ZK链接,创建锁的根节点 - public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) { - this.rootLockNode = rootLockNode; - this.lockName = lockName; - try { - // 创建连接,zkAddress格式为:IP:PORT - zk = new ZooKeeper(zkAddress, this.sessionTimeout, this); - // 检测锁的根节点是否存在,不存在则创建 - Stat stat = zk.exists(rootLockNode, false); - if (null == stat) { - zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (IOException | InterruptedException | KeeperException e) { - e.printStackTrace(); - logger.error("ZooKeeperLock Constructors ===>>>Node already exists!"); - } - } - - // 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放 - public boolean lock() { - if (this.tryLock()) { - logger.warn("ZooKeeperLock method lock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] addZkLock(" + this.currentLock + ")success!"); - return true; - } else { - return waitOtherLock(this.waitLock, this.sessionTimeout); - } - } - - public boolean tryLock() { - // 分隔符 - String split = "_lock_"; - if (this.lockName.contains("_lock_")) { - throw new RuntimeException("lockName can't contains '_lock_' "); - } - try { - // 创建锁节点(临时有序节点) - this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - logger.warn("ZooKeeperLock method tryLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] create zkLockNode(" + this.currentLock + ")success,begin to election..."); - // 取所有子节点 - List nodes = zk.getChildren(this.rootLockNode, false); - // 取所有竞争lockName的锁 - List lockNodes = new ArrayList(); - for (String nodeName : nodes) { - if (nodeName.split(split)[0].equals(this.lockName)) { - lockNodes.add(nodeName); - } - } - Collections.sort(lockNodes); - // 取最小节点与当前锁节点比对加锁 - String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0); - if (this.currentLock.equals(currentLockPath)) { - return true; - } - // 加锁失败,设置前一节点为等待锁节点 - String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1); - int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1; - this.waitLock = lockNodes.get(preNodeIndex); - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } - return false; - } - - private boolean waitOtherLock(String waitLock, int sessionTimeout) { - boolean islock = false; - try { - // 监听等待锁节点 - String waitLockNode = this.rootLockNode + "/" + waitLock; - Stat stat = zk.exists(waitLockNode, true); - if (null != stat) { - logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock fail,wait lock(" + waitLockNode + ")release..."); - // 设置计数器,使用计数器阻塞线程 - this.countDownLatch = new CountDownLatch(1); - islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS); - this.countDownLatch = null; - if (islock) { - logger.warn("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock success,lock(" + waitLockNode + ")release over."); - } else { - logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + "addZkLock fail..."); - } - } else { - islock = true; - } - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } - return islock; - } - - // 3. 释放锁 - public void unlock() throws InterruptedException { - try { - Stat stat = zk.exists(this.currentLock, false); - if (null != stat) { - logger.warn("ZooKeeperLock method unlock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] free zkLock " + this.currentLock); - zk.delete(this.currentLock, -1); - this.currentLock = null; - } - } catch (InterruptedException | KeeperException e) { - e.printStackTrace(); - } finally { - zk.close(); - } - } - - // 4. 监听器回调 - @Override - public void process(WatchedEvent watchedEvent) { - if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) { - // 计数器减一,恢复线程操作 - this.countDownLatch.countDown(); - } - } - -} diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java deleted file mode 100644 index 4a022e9..0000000 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ /dev/null @@ -1,136 +0,0 @@ -package cn.ac.iie.utils.zookeeper; - -import org.apache.commons.lang3.RandomUtils; -import org.apache.log4j.Logger; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - - -public class ZookeeperUtils implements Watcher { - private static Logger logger = Logger.getLogger(ZookeeperUtils.class); - - private ZooKeeper zookeeper; - - private static final int SESSION_TIME_OUT = 20000; - - private CountDownLatch countDownLatch = new CountDownLatch(1); - - @Override - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - countDownLatch.countDown(); - } - } - - - /** - * 修改节点信息 - * - * @param path 节点路径 - */ - public int modifyNode(String path, String zookeeperIp) { - createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); - int workerId = 0; - try { - connectZookeeper(zookeeperIp); - Stat stat = zookeeper.exists(path, true); - workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > 55) { - workerId = 0; - zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - } else { - String result = String.valueOf(workerId + 1); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); - } - } - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - workerId = RandomUtils.nextInt(56, 63); - } - finally { - closeConn(); - } - logger.warn("workerID is:" + workerId); - return workerId; - } - - /** - * 连接zookeeper - * - * @param host 地址 - */ - public void connectZookeeper(String host) { - try { - zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); - countDownLatch.await(); - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * 关闭连接 - */ - public void closeConn() { - try { - if (zookeeper != null) { - zookeeper.close(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * 获取节点内容 - * - * @param path 节点路径 - * @return 内容/异常null - */ - public String getNodeDate(String path) { - String result = null; - Stat stat = new Stat(); - try { - byte[] resByte = zookeeper.getData(path, true, stat); - result = new String(resByte); - } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception"); - e.printStackTrace(); - } - return result; - } - - /** - * @param path 节点创建的路径 - * @param date 节点所存储的数据的byte[] - * @param acls 控制权限策略 - */ - public void createNode(String path, byte[] date, List acls, String zookeeperIp) { - try { - connectZookeeper(zookeeperIp); - Stat exists = zookeeper.exists(path, true); - if (exists == null) { - Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); - if (existsSnowflakeld == null) { - zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); - } - zookeeper.create(path, date, acls, CreateMode.PERSISTENT); - } else { - logger.warn("Node already exists ! Don't need to create"); - } - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } finally { - closeConn(); - } - } - -} diff --git a/src/test/java/cn/ac/iie/test/DistributedLock.java b/src/test/java/cn/ac/iie/test/DistributedLock.java index 030bf4a..0d45cb8 100644 --- a/src/test/java/cn/ac/iie/test/DistributedLock.java +++ b/src/test/java/cn/ac/iie/test/DistributedLock.java @@ -10,7 +10,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; +import com.zdjizhi.utils.ZookeeperUtils; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -183,7 +183,7 @@ public class DistributedLock implements Lock, Watcher { try { lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); lock.lock(); - zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.ZOOKEEPER_SERVERS); } finally { if (lock != null) { lock.unlock();