工具类打成jar包

This commit is contained in:
lee
2020-02-06 14:28:54 +08:00
parent 7e9eb8d4b9
commit 13cae17743
15 changed files with 394 additions and 336 deletions

12
.idea/artifacts/completeUtil.xml generated Normal file
View File

@@ -0,0 +1,12 @@
<component name="ArtifactManager">
<artifact type="jar" name="completeUtil">
<output-path>$PROJECT_DIR$/out/artifacts/completeUtil</output-path>
<root id="archive" name="completeUtil.jar">
<element id="file-copy" path="$PROJECT_DIR$/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java" />
<element id="file-copy" path="$PROJECT_DIR$/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java" />
<element id="file-copy" path="$PROJECT_DIR$/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java" />
<element id="file-copy" path="$PROJECT_DIR$/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java" />
<element id="file-copy" path="$PROJECT_DIR$/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java" />
</root>
</artifact>
</component>

16
.idea/compiler.xml generated Normal file
View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="log-stream-completion" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel>
<module name="log-stream-completion" target="1.8" />
</bytecodeTargetLevel>
</component>
</project>

6
.idea/encodings.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$" charset="UTF-8" />
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@@ -1,13 +1,14 @@
#管理kafka地址
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
#bootstrap.servers=192.168.40.186:9092
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=192.168.40.151:9092
#zookeeper 地址
zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
zookeeper.servers=192.168.40.151:2181
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=192.168.40.224:2182
hbase.zookeeper.servers=192.168.40.151:2181
#hbase tablename
hbase.table.name=subscriber_info
@@ -19,7 +20,7 @@ auto.offset.reset=latest
kafka.topic=SECURITY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=security-policy-191216
group.id=security-policy-200204
#输出topic
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
@@ -43,7 +44,7 @@ ip.library=D:\\dat\\
#kafka批量条数
batch.insert.num=2000
#网关的schema位置
schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log
schema.http=http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log
#数据中心UID
data.center.id.num=15

View File

@@ -1,6 +1,7 @@
package cn.ac.iie.bolt.proxy;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.system.TupleUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;

View File

@@ -39,7 +39,8 @@ public class SecurityCompletionBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
HBaseUtils.change();
HBaseUtils hBaseUtils = new HBaseUtils(FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS, FlowWriteConfig.HBASE_TABLE_NAME);
hBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {

View File

@@ -0,0 +1,143 @@
package cn.ac.iie.utils.general;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.system.SnowflakeId;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import java.util.Base64;
public final class CompleteUtil {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
/**
* 有sni通过sni获取域名有host根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
public static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
} else {
return "";
}
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @return 顶级域名
*/
public static String getDomainName(String host) {
String domain = "";
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
}
return domain;
}
/**
* 生成当前时间戳的操作
*/
public static int getCurrentTime() {
return (int)(System.currentTimeMillis() / 1000);
}
/**
* 雪花模型生成id
*
* @return
*/
public static long getSnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
return SnowflakeId.generateId(zookeeperIp,kafkaTopic,dataCenterIdNum);
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
public static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return asn
*/
public static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return country
*/
public static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* 根据ip去hbase中匹配对应的用户名
* @param clientIp
* @param hbaseZookeeper
* @param hbaseTable
* @return 用户名 subscriber_id
*/
public static String radiusMatch(String clientIp, String hbaseZookeeper, String hbaseTable) {
return HBaseUtils.getAccount(clientIp,hbaseZookeeper,hbaseTable);
}
/**
* base64 解码
*
* @param encodedText mail subject
* @param subjectCharset 编码格式
* @return 解码内容 / 空
*/
public static String base64Str(String encodedText, String subjectCharset) {
Base64.Decoder decoder = Base64.getDecoder();
String sub;
try {
if (StringUtil.isNotBlank(subjectCharset)) {
sub = new String(decoder.decode(encodedText), subjectCharset);
} else {
sub = new String(decoder.decode(encodedText), "UTF-8");
}
return sub;
} catch (Exception e) {
logger.error("transform base64 String failed! base64Str = " + encodedText + "Charset = " + subjectCharset + "error :" + e);
return "";
}
}
}

View File

@@ -15,6 +15,8 @@ import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.regex.Pattern;
import static cn.ac.iie.utils.general.CompleteUtil.*;
/**
* 描述:转换或补全工具类
@@ -25,16 +27,6 @@ import java.util.regex.Pattern;
public class TransFormUtils {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private final static Set<String> PUBLIC_SUFFIX_SET = new HashSet<String>(
Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se"
.split("\\|")));
private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
//在内存中加载反射类用的map
private static HashMap<String, Class> map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP);
@@ -45,55 +37,53 @@ public class TransFormUtils {
/**
* 解析日志,并补全
* 补domain,补subscriber_id
*
* @param message Security原始日志
* @param message kafka Topic原始日志
* @return 补全后的日志
* <p>
* current_timestamp
* snowflake_id
* geo_ip_detail
* geo_asn
* radius_match
* geo_ip_country
* geo_asn
* sub_domain
* sub_domain
*/
public static String dealCommonMessage(String message) {
// message="{\"ssl_sni\":\"pos.baidu.com\",\"ssl_version\":\"v3\",\"ssl_cn\":\"baidu.com\",\"ssl_san\":\"baidu.com;click.hm.baidu.com;cm.pos.baidu.com;log.hm.baidu.com;update.pan.baidu.com;wn.pos.baidu.com;*.91.com;*.aipage.cn;*.aipage.com;*.apollo.auto;*.baidu.com;*.baidubce.com;*.baiducontent.com;*.baidupcs.com;*.baidustatic.com;*.baifubao.com;*.bce.baidu.com;*.bcehost.com;*.bdimg.com;*.bdstatic.com;*.bdtjrcv.com;*.bj.baidubce.com;*.chuanke.com;*.dlnel.com;*.dlnel.org;*.dueros.baidu.com;*.eyun.baidu.com;*.fanyi.baidu.com;*.gz.baidubce.com;*.hao123.baidu.com;*.hao123.com;*.hao222.com;*.haokan.com;*.im.baidu.com;*.map.baidu.com;*.mbd.baidu.com;*.mipcdn.com;*.news.baidu.com;*.nuomi.com;*.safe.baidu.com;*.smartapps.cn;*.su.baidu.com;*.trustgo.com;*.xueshu.baidu.com;apollo.auto;baifubao.com;dwz.cn;mct.y.nuomi.com;www.baidu.cn;www.baidu.com.cn\",\"common_schema_type\":\"SSL\",\"common_server_ip\":\"182.61.200.109\",\"common_client_ip\":\"192.168.50.144\",\"common_server_port\":443,\"common_client_port\":50529,\"common_stream_dir\":3,\"common_address_type\":4,\"common_s2c_pkt_num\":46,\"common_s2c_byte_num\":33149,\"common_c2s_pkt_num\":23,\"common_c2s_byte_num\":6147,\"common_start_time\":1576744784,\"common_end_time\":1576744799,\"common_con_duration_ms\":15000,\"common_stream_trace_id\":7686307990192,\"common_l4_protocol\":\"IPv4_TCP\",\"common_address_list\":\"50529-443-192.168.50.144-182.61.200.109\",\"common_sled_ip\":\"192.168.40.21\",\"common_policy_id\":172,\"common_service\":0,\"common_action\":2,\"common_user_region\":\"{\\\"protocol\\\":\\\"SSL\\\",\\\"protocol_version\\\":{\\\"allow_http2\\\":1,\\\"min\\\":\\\"ssl3\\\",\\\"max\\\":\\\"tls13\\\",\\\"mirror_client\\\":1},\\\"dynamic_bypass\\\":{\\\"mutual_authentication\\\":1,\\\"cert_pinning\\\":1,\\\"cert_transparency\\\":0,\\\"protocol_errors\\\":1,\\\"ev_cert\\\":0},\\\"decrypt_mirror\\\":{\\\"enable\\\":0},\\\"certificate_checks\\\":{\\\"fail_action\\\":\\\"pass-through\\\",\\\"approach\\\":{\\\"self-signed\\\":1,\\\"expiration\\\":1,\\\"cn\\\":1,\\\"issuer\\\":1}},\\\"keyring\\\":1}\"}";
Object object = JSONObject.parseObject(message, mapObject.getClass());
// System.out.println("补全之前 ===》 "+JSON.toJSONString(object));
try {
for (String[] strings : jobList) {
//参数的值
Object use = JsonParseUtil.getValue(object, strings[0]);
//补全的字段的值
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(object, strings[1]);
//匹配操作函数的字段
String function=strings[2];
//额外的参数的值
Object param = null;
if (strings[3] != null){
param=JsonParseUtil.getValue(object, strings[3]);
}
if (strings[2].equals("current_timestamp")) {
if (function.equals("current_timestamp")) {
JsonParseUtil.setValue(object, strings[1], getCurrentTime());
} else if (strings[2].equals("snowflake_id")) {
JsonParseUtil.setValue(object, strings[1], getSnowflakeId());
} else if (strings[2].equals("geo_ip_detail")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(use.toString()));
} else if (strings[2].equals("geo_asn")) {
JsonParseUtil.setValue(object, strings[1], getGeoAsn(use.toString()));
} else if (strings[2].equals("radius_match")) {
JsonParseUtil.setValue(object, strings[1], radiusMatch(use.toString()));
} else if (strings[2].equals("geo_ip_country")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(use.toString()));
} else if (strings[0].equals("http_host") && strings[2].equals("sub_domain") && use != null) {
} else if (function.equals("snowflake_id")) {
JsonParseUtil.setValue(object, strings[1], getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM));
} else if (function.equals("geo_ip_detail")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString()));
} else if (function.equals("geo_asn")) {
JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString()));
} else if (function.equals("radius_match")) {
JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString(),FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS,FlowWriteConfig.HBASE_TABLE_NAME));
} else if (function.equals("geo_ip_country")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString()));
} else if (function.equals("decode_of_base64") && param != null){
JsonParseUtil.setValue(object, strings[1], base64Str(name.toString(),param.toString()));
} else if (name.equals("http_host") && function.equals("sub_domain")) {
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, strings[1], getTopDomain(null, use.toString()));
JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString()));
}
} else if (strings[0].equals("ssl_sni") && strings[2].equals("sub_domain") && use != null) {
} else if (name.equals("ssl_sni") && strings[2].equals("sub_domain")) {
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, strings[1], getTopDomain(use.toString(), null));
JsonParseUtil.setValue(object, strings[1], getTopDomain(name.toString(), null));
}
}
@@ -114,131 +104,9 @@ public class TransFormUtils {
@Test
public void aaa() {
String sni = "203.187.160.131:9011";
String sni = "www.baidu.com";
System.out.println(getTopDomain(sni, null));
System.out.println(getTopDomain(null,sni));
}
/**
* 有sni通过sni获取域名有host根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
private static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
} else {
return "";
}
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @return 顶级域名
*/
private static String getDomainName(String host) {
String domain = "";
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
}
return domain;
}
/**
* 生成当前时间戳的操作
*/
private static int getCurrentTime() {
return (int)(System.currentTimeMillis() / 1000);
}
/**
* 雪花模型生成id
*
* @return
*/
private static long getSnowflakeId() {
return SnowflakeId.generateId();
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return
*/
private static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return
*/
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助hbase补齐
*
* @param ip
* @return
*/
private static String radiusMatch(String ip) {
return HBaseUtils.getAccount(ip);
}
/**
* switch 匹配合适的方法
* current_timestamp
* snowflake_id
* geo_ip_detail
* geo_asn
* radius_match
* geo_ip_country
* geo_asn
* sub_domain
* sub_domain
* @param func
*/
//TODO 行不通的原因是无法统一一个确定的返回值类型
/* private static String switchFunc(String func){
switch (func){
case "current_timestamp":
return String.valueOf(getCurrentTime());
case "snowflake_id":
case "geo_ip_detail":
case "geo_asn":
case "radius_match":
case "geo_ip_country":
case "sub_domain":
}
return func;
}*/
}
}

View File

@@ -1,8 +1,5 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.IpUtils;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -15,36 +12,55 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* HBase 工具类
*
* @author qidaijie
*/
import java.util.Timer;
import java.util.TimerTask;
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String, String> subIdMap = new HashMap<>(333334);
// private static Map<String, String> subIdMap = new ConcurrentSkipListMap<>();
private static Connection connection;
private static Long time;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
private static String zookeeperIp;
private static String hbaseTable;
private static HBaseUtils hBaseUtils;
private static void getHbaseInstance(String zookeeperServer, String hbaseTableName) {
hBaseUtils = new HBaseUtils(zookeeperServer, hbaseTableName);
}
/**
* 构造函数-新-20191023
*/
public HBaseUtils(String zookeeperServer, String hbaseTableName) {
zookeeperIp = zookeeperServer;
hbaseTable = hbaseTableName;
//获取连接
getHbaseConn();
//拉取所有
getAll();
//定时更新
updateHabaseCache();
}
private static void getHbaseConn() {
try {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", zookeeperIp);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
getAll();
} catch (IOException e) {
logger.error("获取HBase连接失败");
logger.warn("HBaseUtils get HBase connection,now to getAll().");
} catch (IOException ioe) {
logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
e.printStackTrace();
}
}
@@ -53,7 +69,7 @@ public class HBaseUtils {
* 更新变量
*/
public static void change() {
Long nowTime = System.currentTimeMillis();
long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500);
}
@@ -69,7 +85,7 @@ public class HBaseUtils {
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
table = connection.getTable(TableName.valueOf("sub:" + hbaseTable));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -87,10 +103,14 @@ public class HBaseUtils {
}
}
Long end = System.currentTimeMillis();
logger.warn("当前集合长度" + subIdMap.keySet().size());
logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end);
time = endTime;
} catch (IOException e) {
} catch (IOException ioe) {
logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
e.printStackTrace();
} finally {
if (scanner != null) {
@@ -110,9 +130,9 @@ public class HBaseUtils {
* 获取所有的 key value
*/
private static void getAll() {
Long begin = System.currentTimeMillis();
long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
Table table = connection.getTable(TableName.valueOf("sub:" + hbaseTable));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -121,10 +141,14 @@ public class HBaseUtils {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
logger.warn("获取全量后集合长度:" + subIdMap.size());
logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin));
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException e) {
} catch (IOException ioe) {
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
e.printStackTrace();
}
}
@@ -135,8 +159,26 @@ public class HBaseUtils {
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String clientIp) {
public static String getAccount(String clientIp, String hbaseZookeeper, String hbaseTable) {
if (hBaseUtils == null) {
getHbaseInstance(hbaseZookeeper, hbaseTable);
}
return subIdMap.get(clientIp);
}
private void updateHabaseCache() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
change();
} catch (Exception e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
e.printStackTrace();
}
}
}, 1, 1000 * 60);
}
}

View File

@@ -145,6 +145,7 @@ public class JsonParseUtil {
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList)
*
* @param http
* @return
*/
@@ -167,25 +168,56 @@ public class JsonParseUtil {
Object format = JSON.parseObject(doc.toString()).get("format");
if (format != null) {
Object functions = JSON.parseObject(format.toString()).get("functions");
Object appendTo = JSON.parseObject(format.toString()).get("appendTo");
if (appendTo != null) {
String[] functionArray = functions.toString().split(",");
String[] appendToArray = appendTo.toString().split(",");
String functions = null;
String appendTo = null;
String params = null;
Object functionsObj = JSON.parseObject(format.toString()).get("functions");
Object appendToObj = JSON.parseObject(format.toString()).get("appendTo");
Object paramObj = JSON.parseObject(format.toString()).get("param");
if (functionsObj != null) {
functions = functionsObj.toString();
}
if (appendToObj != null) {
appendTo = appendToObj.toString();
}
if (paramObj != null) {
params = paramObj.toString();
}
if (appendTo != null && params == null) {
String[] functionArray = functions.split(",");
String[] appendToArray = appendTo.split(",");
for (int i = 0; i < functionArray.length; i++) {
// useList.add(name);
// toList.add(appendToArray[i]);
// funcList.add(functionArray[i]);
list.add(new String[]{name, appendToArray[i], functionArray[i]});
list.add(new String[]{name, appendToArray[i], functionArray[i],null});
}
} else {
}else if (appendTo != null && params != null){
String[] functionArray = functions.split(",");
String[] appendToArray = appendTo.split(",");
String[] paramArray = params.split(",");
for (int i = 0; i < functionArray.length; i++) {
// useList.add(name);
// toList.add(appendToArray[i]);
// funcList.add(functionArray[i]);
list.add(new String[]{name, appendToArray[i], functionArray[i],paramArray[i]});
}
}
else {
// useList.add(name);
// funcList.add(functions.toString());
// toList.add(name);
list.add(new String[]{name, name, functions.toString()});
list.add(new String[]{name, name, functions,params});
}
}

View File

@@ -1,20 +1,13 @@
package cn.ac.iie.utils.system;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZooKeeperLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import org.apache.log4j.Logger;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static Logger logger = Logger.getLogger(SnowflakeId.class);
// ==============================Fields===========================================
/**
* 开始时间截 (2018-08-01 00:00:00) max 17years
*/
@@ -31,7 +24,8 @@ public class SnowflakeId {
private final long dataCenterIdBits = 4L;
/**
* 支持的最大机器id结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
@@ -90,53 +84,36 @@ public class SnowflakeId {
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId();
private static void getSnowflakeldInstance(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
idWorker = new SnowflakeId(zookeeperIp, kafkaTopic, dataCenterIdNum);
}
//==============================Constructors=====================================
/**
* 构造函数
*/
// private SnowflakeId() {
// DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
// lock.lock();
// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
// }
// int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
// if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
// }
// this.workerId = tmpWorkerId;
// this.dataCenterId = dataCenterId;
// }
private SnowflakeId() {
ZooKeeperLock lock = new ZooKeeperLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "/locks", "disLocks");
private SnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1");
if (lock.lock()) {
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterId;
this.dataCenterId = dataCenterIdNum;
try {
lock.unlock();
} catch (InterruptedException e) {
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}
}
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
@@ -204,9 +181,10 @@ public class SnowflakeId {
*
* @return
*/
public static Long generateId() {
public static Long generateId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) {
if (idWorker == null) {
getSnowflakeldInstance(zookeeperIp, kafkaTopic, dataCenterIdNum);
}
return idWorker.nextId();
}
}

View File

@@ -1,7 +1,5 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.SnowflakeId;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -15,9 +13,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author qidaijie
*/
public class DistributedLock implements Lock, Watcher {
private static Logger logger = Logger.getLogger(DistributedLock.class);
@@ -83,7 +79,7 @@ public class DistributedLock implements Lock, Watcher {
}
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
logger.warn(Thread.currentThread().getName() + " " + lockName + " is being locked......");
} else {
// 等待锁
waitForLock(waitLock, sessionTimeout);
@@ -98,7 +94,7 @@ public class DistributedLock implements Lock, Watcher {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("锁名有误");
throw new LockException("locked name is error!!!");
}
// 创建临时有序节点
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
@@ -187,20 +183,4 @@ public class DistributedLock implements Lock, Watcher {
super(e);
}
}
public static void main(String[] args) {
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
Runnable runnable = new Runnable() {
@Override
public void run() {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
lock.unlock();
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}

View File

@@ -1,5 +1,6 @@
package cn.ac.iie.utils.zookeeper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -11,6 +12,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ZooKeeperLock implements Watcher {
private static Logger logger = Logger.getLogger(ZooKeeperLock.class);
private ZooKeeper zk = null;
private String rootLockNode; // 锁的根节点
private String lockName; // 竞争资源,用来生成子节点名称
@@ -33,13 +36,14 @@ public class ZooKeeperLock implements Watcher {
}
} catch (IOException | InterruptedException | KeeperException e) {
e.printStackTrace();
logger.error("ZooKeeperLock Constructors ===>>>Node already exists!");
}
}
// 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
public boolean lock() {
if (this.tryLock()) {
System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功");
logger.warn("ZooKeeperLock method lock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] addZkLock(" + this.currentLock + ")success");
return true;
} else {
return waitOtherLock(this.waitLock, this.sessionTimeout);
@@ -56,8 +60,7 @@ public class ZooKeeperLock implements Watcher {
// 创建锁节点(临时有序节点)
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("线程【" + Thread.currentThread().getName()
+ "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");
logger.warn("ZooKeeperLock method tryLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] create zkLockNode(" + this.currentLock + ")successbegin to election...");
// 取所有子节点
List<String> nodes = zk.getChildren(this.rootLockNode, false);
// 取所有竞争lockName的锁
@@ -90,18 +93,15 @@ public class ZooKeeperLock implements Watcher {
String waitLockNode = this.rootLockNode + "/" + waitLock;
Stat stat = zk.exists(waitLockNode, true);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName()
+ "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock fail,wait lock(" + waitLockNode + ")release...");
// 设置计数器,使用计数器阻塞线程
this.countDownLatch = new CountDownLatch(1);
islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
if (islock) {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
logger.warn("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock success,lock(" + waitLockNode + ")release over.");
} else {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁失败...");
logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + "addZkLock fail...");
}
} else {
islock = true;
@@ -117,7 +117,7 @@ public class ZooKeeperLock implements Watcher {
try {
Stat stat = zk.exists(this.currentLock, false);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
logger.warn("ZooKeeperLock method unlock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] free zkLock " + this.currentLock);
zk.delete(this.currentLock, -1);
this.currentLock = null;
}
@@ -137,34 +137,4 @@ public class ZooKeeperLock implements Watcher {
}
}
public static void doSomething() {
}
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
ZooKeeperLock lock = null;
lock = new ZooKeeperLock("192.168.40.119:2181", "/locks", "test1");
if (lock.lock()) {
doSomething();
try {
// Thread.sleep(1000);
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}

View File

@@ -1,6 +1,5 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
@@ -11,9 +10,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
@@ -36,12 +33,11 @@ public class ZookeeperUtils implements Watcher {
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
public int modifyNode(String path, String zookeeperIp) {
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
int workerId = 0;
try {
connectZookeeper();
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
@@ -58,20 +54,22 @@ public class ZookeeperUtils implements Watcher {
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
} finally {
}
finally {
closeConn();
}
logger.error("工作ID是" + workerId);
logger.warn("workerID is" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
private void connectZookeeper() {
public void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
@@ -81,7 +79,7 @@ public class ZookeeperUtils implements Watcher {
/**
* 关闭连接
*/
private void closeConn() {
public void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
@@ -97,7 +95,7 @@ public class ZookeeperUtils implements Watcher {
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
public String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
@@ -115,14 +113,18 @@ public class ZookeeperUtils implements Watcher {
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
try {
connectZookeeper();
connectZookeeper(zookeeperIp);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
if (existsSnowflakeld == null) {
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
}
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
logger.warn("Node already exists ! Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();