动态获取schema代码更新0225

This commit is contained in:
lee
2020-02-25 14:16:16 +08:00
parent 13cae17743
commit 28e0b9e38c
14 changed files with 182 additions and 720 deletions

2
.idea/vcs.xml generated
View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@@ -107,7 +107,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>-->
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -153,7 +153,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>

View File

@@ -20,7 +20,7 @@ auto.offset.reset=latest
kafka.topic=SECURITY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=security-policy-200204
group.id=security-policy-200224
#输出topic
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
@@ -29,13 +29,13 @@ results.output.topic=SECURITY-EVENT-COMPLETED-LOG
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=3
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=3
kafka.bolt.parallelism=1
#定位库地址
#ip.library=/home/ceiec/topology/dat/

View File

@@ -17,7 +17,6 @@ import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
import static cn.ac.iie.utils.hbase.HBaseUtils.change;
/**
* 通联关系日志补全
@@ -38,7 +37,7 @@ public class ProxyCompletionBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
change();
HBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {

View File

@@ -39,8 +39,7 @@ public class SecurityCompletionBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
HBaseUtils hBaseUtils = new HBaseUtils(FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS, FlowWriteConfig.HBASE_TABLE_NAME);
hBaseUtils.change();
HBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {

View File

@@ -37,6 +37,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;

View File

@@ -1,143 +0,0 @@
package cn.ac.iie.utils.general;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.system.SnowflakeId;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import java.util.Base64;
public final class CompleteUtil {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
/**
* 有sni通过sni获取域名有host根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
public static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
} else {
return "";
}
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @return 顶级域名
*/
public static String getDomainName(String host) {
String domain = "";
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
}
return domain;
}
/**
* 生成当前时间戳的操作
*/
public static int getCurrentTime() {
return (int)(System.currentTimeMillis() / 1000);
}
/**
* 雪花模型生成id
*
* @return
*/
public static long getSnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
return SnowflakeId.generateId(zookeeperIp,kafkaTopic,dataCenterIdNum);
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
public static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return asn
*/
public static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return country
*/
public static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* 根据ip去hbase中匹配对应的用户名
* @param clientIp
* @param hbaseZookeeper
* @param hbaseTable
* @return 用户名 subscriber_id
*/
public static String radiusMatch(String clientIp, String hbaseZookeeper, String hbaseTable) {
return HBaseUtils.getAccount(clientIp,hbaseZookeeper,hbaseTable);
}
/**
* base64 解码
*
* @param encodedText mail subject
* @param subjectCharset 编码格式
* @return 解码内容 / 空
*/
public static String base64Str(String encodedText, String subjectCharset) {
Base64.Decoder decoder = Base64.getDecoder();
String sub;
try {
if (StringUtil.isNotBlank(subjectCharset)) {
sub = new String(decoder.decode(encodedText), subjectCharset);
} else {
sub = new String(decoder.decode(encodedText), "UTF-8");
}
return sub;
} catch (Exception e) {
logger.error("transform base64 String failed! base64Str = " + encodedText + "Charset = " + subjectCharset + "error :" + e);
return "";
}
}
}

View File

@@ -4,18 +4,15 @@ package cn.ac.iie.utils.general;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.json.JsonParseUtil;
import cn.ac.iie.utils.system.SnowflakeId;
import com.alibaba.fastjson.JSONObject;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.regex.Pattern;
import static cn.ac.iie.utils.general.CompleteUtil.*;
/**
@@ -34,7 +31,15 @@ public class TransFormUtils {
private static Object mapObject = JsonParseUtil.generateObject(map);
//获取任务列表
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
//补全工具类
private static FormatUtils build = new FormatUtils.Builder(false).build();
//IP定位库工具类
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
/**
* 解析日志,并补全
*
@@ -43,10 +48,7 @@ public class TransFormUtils {
*/
public static String dealCommonMessage(String message) {
Object object = JSONObject.parseObject(message, mapObject.getClass());
// System.out.println("补全之前 ===》 "+JSON.toJSONString(object));
try {
for (String[] strings : jobList) {
@@ -66,17 +68,17 @@ public class TransFormUtils {
if (function.equals("current_timestamp")) {
JsonParseUtil.setValue(object, strings[1], getCurrentTime());
} else if (function.equals("snowflake_id")) {
JsonParseUtil.setValue(object, strings[1], getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM));
JsonParseUtil.setValue(object, strings[1], build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM));
} else if (function.equals("geo_ip_detail")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString()));
} else if (function.equals("geo_asn")) {
JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString()));
} else if (function.equals("radius_match")) {
JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString(),FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS,FlowWriteConfig.HBASE_TABLE_NAME));
JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString()));
} else if (function.equals("geo_ip_country")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString()));
} else if (function.equals("decode_of_base64") && param != null){
JsonParseUtil.setValue(object, strings[1], base64Str(name.toString(),param.toString()));
JsonParseUtil.setValue(object, strings[1], FormatUtils.base64Str(name.toString(),param.toString()));
} else if (name.equals("http_host") && function.equals("sub_domain")) {
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString()));
@@ -102,11 +104,96 @@ public class TransFormUtils {
}
@Test
public void aaa() {
String sni = "www.baidu.com";
System.out.println(getTopDomain(sni, null));
System.out.println(getTopDomain(null,sni));
// @Test
// public void aaa() {
// String sni = "www.baidu.com";
// System.out.println(getTopDomain(sni, null));
// System.out.println(getTopDomain(null,sni));
//
// }
/**
* 有sni通过sni获取域名有host根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
private static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
} else {
return "";
}
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @return 顶级域名
*/
private static String getDomainName(String host) {
String domain = "";
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
}
return domain;
}
/**
* 生成当前时间戳的操作
*/
private static int getCurrentTime() {
return (int)(System.currentTimeMillis() / 1000);
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return
*/
private static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return
*/
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助hbase补齐
*
* @param ip
* @return
*/
private static String radiusMatch(String ip) {
return HBaseUtils.getAccount(ip);
}
}

View File

@@ -1,5 +1,6 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -12,55 +13,34 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
/**
* HBase 工具类
*
* @author qidaijie
*/
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String, String> subIdMap = new HashMap<>(333334);
// private static Map<String, String> subIdMap = new ConcurrentSkipListMap<>();
private static Connection connection;
private static Long time;
private static String zookeeperIp;
private static String hbaseTable;
private static HBaseUtils hBaseUtils;
private static void getHbaseInstance(String zookeeperServer, String hbaseTableName) {
hBaseUtils = new HBaseUtils(zookeeperServer, hbaseTableName);
}
/**
* 构造函数-新-20191023
*/
public HBaseUtils(String zookeeperServer, String hbaseTableName) {
zookeeperIp = zookeeperServer;
hbaseTable = hbaseTableName;
//获取连接
getHbaseConn();
//拉取所有
getAll();
//定时更新
updateHabaseCache();
}
private static void getHbaseConn() {
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
try {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", zookeeperIp);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
logger.warn("HBaseUtils get HBase connection,now to getAll().");
} catch (IOException ioe) {
logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
getAll();
} catch (IOException e) {
logger.error("获取HBase连接失败");
e.printStackTrace();
}
}
@@ -69,7 +49,7 @@ public class HBaseUtils {
* 更新变量
*/
public static void change() {
long nowTime = System.currentTimeMillis();
Long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500);
}
@@ -85,7 +65,7 @@ public class HBaseUtils {
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + hbaseTable));
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -103,14 +83,10 @@ public class HBaseUtils {
}
}
Long end = System.currentTimeMillis();
logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end);
logger.warn("当前集合长度" + subIdMap.keySet().size());
logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
time = endTime;
} catch (IOException ioe) {
logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (scanner != null) {
@@ -130,9 +106,9 @@ public class HBaseUtils {
* 获取所有的 key value
*/
private static void getAll() {
long begin = System.currentTimeMillis();
Long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + hbaseTable));
Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -141,14 +117,10 @@ public class HBaseUtils {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
logger.warn("获取全量后集合长度:" + subIdMap.size());
logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException ioe) {
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
} catch (IOException e) {
e.printStackTrace();
}
}
@@ -159,26 +131,8 @@ public class HBaseUtils {
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String clientIp, String hbaseZookeeper, String hbaseTable) {
if (hBaseUtils == null) {
getHbaseInstance(hbaseZookeeper, hbaseTable);
}
public static String getAccount(String clientIp) {
return subIdMap.get(clientIp);
}
private void updateHabaseCache() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
change();
} catch (Exception e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
e.printStackTrace();
}
}
}, 1, 1000 * 60);
}
}

View File

@@ -1,190 +0,0 @@
package cn.ac.iie.utils.system;
import cn.ac.iie.utils.zookeeper.ZooKeeperLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import org.apache.log4j.Logger;
public class SnowflakeId {
private static Logger logger = Logger.getLogger(SnowflakeId.class);
/**
* 开始时间截 (2018-08-01 00:00:00) max 17years
*/
private final long twepoch = 1564588800000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 6L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 4L;
/**
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是15
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 14L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为16383
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~63)
*/
private long workerId;
/**
* 数据中心ID(0~15)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~16383)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
private static void getSnowflakeldInstance(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
idWorker = new SnowflakeId(zookeeperIp, kafkaTopic, dataCenterIdNum);
}
/**
* 构造函数
*/
private SnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1");
if (lock.lock()) {
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterIdNum;
try {
lock.unlock();
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}
}
}
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
if (idWorker == null) {
getSnowflakeldInstance(zookeeperIp, kafkaTopic, dataCenterIdNum);
}
return idWorker.nextId();
}
}

View File

@@ -1,140 +0,0 @@
package cn.ac.iie.utils.zookeeper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ZooKeeperLock implements Watcher {
private static Logger logger = Logger.getLogger(ZooKeeperLock.class);
private ZooKeeper zk = null;
private String rootLockNode; // 锁的根节点
private String lockName; // 竞争资源,用来生成子节点名称
private String currentLock; // 当前锁
private String waitLock; // 等待的锁(前一个锁)
private CountDownLatch countDownLatch; // 计数器(用来在加锁失败时阻塞加锁线程)
private int sessionTimeout = 30000; // 超时时间
// 1. 构造器中创建ZK链接创建锁的根节点
public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
this.rootLockNode = rootLockNode;
this.lockName = lockName;
try {
// 创建连接zkAddress格式为IP:PORT
zk = new ZooKeeper(zkAddress, this.sessionTimeout, this);
// 检测锁的根节点是否存在,不存在则创建
Stat stat = zk.exists(rootLockNode, false);
if (null == stat) {
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
logger.error("ZooKeeperLock Constructors ===>>>Node already exists!");
}
}
// 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
public boolean lock() {
if (this.tryLock()) {
logger.warn("ZooKeeperLock method lock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] addZkLock(" + this.currentLock + ")success");
return true;
} else {
return waitOtherLock(this.waitLock, this.sessionTimeout);
}
}
public boolean tryLock() {
// 分隔符
String split = "_lock_";
if (this.lockName.contains("_lock_")) {
throw new RuntimeException("lockName can't contains '_lock_' ");
}
try {
// 创建锁节点(临时有序节点)
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.warn("ZooKeeperLock method tryLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] create zkLockNode(" + this.currentLock + ")successbegin to election...");
// 取所有子节点
List<String> nodes = zk.getChildren(this.rootLockNode, false);
// 取所有竞争lockName的锁
List<String> lockNodes = new ArrayList<String>();
for (String nodeName : nodes) {
if (nodeName.split(split)[0].equals(this.lockName)) {
lockNodes.add(nodeName);
}
}
Collections.sort(lockNodes);
// 取最小节点与当前锁节点比对加锁
String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
if (this.currentLock.equals(currentLockPath)) {
return true;
}
// 加锁失败,设置前一节点为等待锁节点
String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
this.waitLock = lockNodes.get(preNodeIndex);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return false;
}
private boolean waitOtherLock(String waitLock, int sessionTimeout) {
boolean islock = false;
try {
// 监听等待锁节点
String waitLockNode = this.rootLockNode + "/" + waitLock;
Stat stat = zk.exists(waitLockNode, true);
if (null != stat) {
logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock fail,wait lock(" + waitLockNode + ")release...");
// 设置计数器,使用计数器阻塞线程
this.countDownLatch = new CountDownLatch(1);
islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
if (islock) {
logger.warn("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock success,lock(" + waitLockNode + ")release over.");
} else {
logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + "addZkLock fail...");
}
} else {
islock = true;
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return islock;
}
// 3. 释放锁
public void unlock() throws InterruptedException {
try {
Stat stat = zk.exists(this.currentLock, false);
if (null != stat) {
logger.warn("ZooKeeperLock method unlock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] free zkLock " + this.currentLock);
zk.delete(this.currentLock, -1);
this.currentLock = null;
}
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} finally {
zk.close();
}
}
// 4. 监听器回调
@Override
public void process(WatchedEvent watchedEvent) {
if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
// 计数器减一,恢复线程操作
this.countDownLatch.countDown();
}
}
}

View File

@@ -1,136 +0,0 @@
package cn.ac.iie.utils.zookeeper;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path, String zookeeperIp) {
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
int workerId = 0;
try {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
}
finally {
closeConn();
}
logger.warn("workerID is" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
public void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
public void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
public String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = new String(resByte);
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception");
e.printStackTrace();
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
try {
connectZookeeper(zookeeperIp);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
if (existsSnowflakeld == null) {
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
}
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists ! Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
} finally {
closeConn();
}
}
}

View File

@@ -10,7 +10,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import com.zdjizhi.utils.ZookeeperUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -183,7 +183,7 @@ public class DistributedLock implements Lock, Watcher {
try {
lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.ZOOKEEPER_SERVERS);
} finally {
if (lock != null) {
lock.unlock();

View File

@@ -1,22 +1,53 @@
package cn.ac.iie.test;
import cn.ac.iie.common.FlowWriteConfig;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.*;
import org.apache.log4j.Logger;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
public class test {
private static Logger logger = Logger.getLogger(test.class);
public static void main(String[] args) {
String message = "{\"common_schema_type\":\"SSL\",\"ssl_sni\":\"www.myssl.cn\",\"common_server_ip\":\"101.37.81.250\",\"common_client_ip\":\"192.168.50.12\",\"common_server_port\":443,\"common_client_port\":51398,\"common_stream_dir\":3,\"common_address_type\":4,\"common_s2c_pkt_num\":1,\"common_s2c_byte_num\":0,\"common_c2s_pkt_num\":3,\"common_c2s_byte_num\":517,\"common_stream_trace_id\":7629423224647,\"common_l4_protocol\":\"IPv4_TCP\",\"common_address_list\":\"51398-443-192.168.50.12-101.37.81.250\",\"common_sled_ip\":\"192.168.40.161\",\"common_start_time\":1575008799,\"common_end_time\":1575008799,\"common_con_duration_ms\":0,\"common_policy_id\":282,\"common_service\":0,\"common_action\":128,\"common_user_region\":\"{\\\"protocol\\\":\\\"SSL\\\"}\"}";
JSONObject jsonObject = JSON.parseObject(message);
String common_server_ip = jsonObject.getString("common_server_ip");
String common_client_ip = jsonObject.getString("common_client_ip");
static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
jsonObject.put("common__recv_time",(System.currentTimeMillis() / 1000));
//
@Test
public void test() throws InterruptedException {
File file = new File("D:\\123\\test.txt");
String zookeeperIp ="192.168.40.224:2181";
String kafkaTopic ="CONNECTION-RECORD-LOG";
System.out.println(zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp));
System.out.println(zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp));
// ArrayList<Long> list = Lists.newArrayList();
// for (int i = 1; i <= 500; i++) {
// ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1");
// if (lock.lock()) {
// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp);
// Long generateId = SnowflakeId.generateId(tmpWorkerId, 12);
// System.err.println(generateId);
// list.add(generateId);
// lock.unlock();
// }
// if(i%5==0) {
//// fileWrite(list, file);
// Thread.sleep(1000);
// }
// }
// System.err.println("第2个进程结束");
// FormatUtils build = new FormatUtils.Builder(false).build();
// long snowflakeId = build.getSnowflakeId("192.168.40.224:2181", "CONNECTION-RECORD-LOG", 12);
// System.err.println(snowflakeId);
System.out.println(jsonObject.toString());
}
}