diff --git a/.idea/artifacts/completeUtil.xml b/.idea/artifacts/completeUtil.xml new file mode 100644 index 0000000..f6f9995 --- /dev/null +++ b/.idea/artifacts/completeUtil.xml @@ -0,0 +1,12 @@ + + + $PROJECT_DIR$/out/artifacts/completeUtil + + + + + + + + + \ No newline at end of file diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..7603751 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..b26911b --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 1e1bf00..90b80e1 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,13 +1,14 @@ #管理kafka地址 -bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 -#bootstrap.servers=192.168.40.186:9092 +#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +bootstrap.servers=192.168.40.151:9092 #zookeeper 地址 -zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 +zookeeper.servers=192.168.40.151:2181 +#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 #hbase zookeeper地址 #hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 -hbase.zookeeper.servers=192.168.40.224:2182 +hbase.zookeeper.servers=192.168.40.151:2181 #hbase tablename hbase.table.name=subscriber_info @@ -19,7 +20,7 @@ auto.offset.reset=latest kafka.topic=SECURITY-EVENT-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=security-policy-191216 +group.id=security-policy-200204 #输出topic results.output.topic=SECURITY-EVENT-COMPLETED-LOG @@ -43,7 +44,7 @@ ip.library=D:\\dat\\ #kafka批量条数 batch.insert.num=2000 #网关的schema位置 -schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/security_event_log +schema.http=http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log #数据中心(UID) data.center.id.num=15 diff --git a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java index f89801a..56f8f8b 100644 --- a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java @@ -1,6 +1,7 @@ package cn.ac.iie.bolt.proxy; import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.system.TupleUtils; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; diff --git a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java index 9a6b303..7a4182e 100644 --- a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java @@ -39,7 +39,8 @@ public class SecurityCompletionBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { try { if (TupleUtils.isTick(tuple)) { - HBaseUtils.change(); + HBaseUtils hBaseUtils = new HBaseUtils(FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS, FlowWriteConfig.HBASE_TABLE_NAME); + hBaseUtils.change(); } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { diff --git a/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java b/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java new file mode 100644 index 0000000..b786a4a --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/general/CompleteUtil.java @@ -0,0 +1,143 @@ +package cn.ac.iie.utils.general; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.hbase.HBaseUtils; +import cn.ac.iie.utils.system.SnowflakeId; +import com.google.common.net.InternetDomainName; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; + +import java.util.Base64; + + +public final class CompleteUtil { + + private static Logger logger = Logger.getLogger(TransFormUtils.class); + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 有sni通过sni获取域名,有host根据host获取域名 + * + * @param sni sni + * @param host host + * @return 顶级域名 + */ + public static String getTopDomain(String sni, String host) { + if (StringUtil.isNotBlank(host)) { + return getDomainName(host); + } else if (StringUtil.isNotBlank(sni)) { + return getDomainName(sni); + } else { + return ""; + } + } + + + /** + * 根据url截取顶级域名 + * + * @param host 网站url + * @return 顶级域名 + */ + public static String getDomainName(String host) { + String domain = ""; + try { + domain = InternetDomainName.from(host).topPrivateDomain().toString(); + } catch (Exception e) { + logger.error("host解析顶级域名异常: " + e.getMessage()); + } + return domain; + } + + /** + * 生成当前时间戳的操作 + */ + public static int getCurrentTime() { + return (int)(System.currentTimeMillis() / 1000); + } + + /** + * 雪花模型生成id + * + * @return + */ + public static long getSnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { + + return SnowflakeId.generateId(zookeeperIp,kafkaTopic,dataCenterIdNum); + } + + /** + * 根据clientIp获取location信息 + * + * @param ip + * @return + */ + public static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + } + + /** + * 根据ip获取asn信息 + * + * @param ip + * @return asn + */ + public static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip, true); + } + + /** + * 根据ip获取country信息 + * + * @param ip + * @return country + */ + public static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + /** + * 根据ip去hbase中匹配对应的用户名 + * @param clientIp + * @param hbaseZookeeper + * @param hbaseTable + * @return 用户名 subscriber_id + */ + public static String radiusMatch(String clientIp, String hbaseZookeeper, String hbaseTable) { + return HBaseUtils.getAccount(clientIp,hbaseZookeeper,hbaseTable); + } + + + /** + * base64 解码 + * + * @param encodedText mail subject + * @param subjectCharset 编码格式 + * @return 解码内容 / 空 + */ + public static String base64Str(String encodedText, String subjectCharset) { + Base64.Decoder decoder = Base64.getDecoder(); + String sub; + try { + if (StringUtil.isNotBlank(subjectCharset)) { + sub = new String(decoder.decode(encodedText), subjectCharset); + } else { + sub = new String(decoder.decode(encodedText), "UTF-8"); + } + return sub; + } catch (Exception e) { + logger.error("transform base64 String failed! base64Str = " + encodedText + "Charset = " + subjectCharset + "error :" + e); + return ""; + } + } + +} diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 0f66dc5..fb028eb 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -15,6 +15,8 @@ import org.junit.jupiter.api.Test; import java.util.*; import java.util.regex.Pattern; +import static cn.ac.iie.utils.general.CompleteUtil.*; + /** * 描述:转换或补全工具类 @@ -25,16 +27,6 @@ import java.util.regex.Pattern; public class TransFormUtils { private static Logger logger = Logger.getLogger(TransFormUtils.class); - private final static Set PUBLIC_SUFFIX_SET = new HashSet( - Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se" - .split("\\|"))); - private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})"); - private static IpLookup ipLookup = new IpLookup.Builder(false) - .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") - .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") - .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") - .build(); //在内存中加载反射类用的map private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); @@ -45,55 +37,53 @@ public class TransFormUtils { /** * 解析日志,并补全 - * 补domain,补subscriber_id * - * @param message Security原始日志 + * @param message kafka Topic原始日志 * @return 补全后的日志 - *

- * current_timestamp - * snowflake_id - * geo_ip_detail - * geo_asn - * radius_match - * geo_ip_country - * geo_asn - * sub_domain - * sub_domain */ public static String dealCommonMessage(String message) { -// message="{\"ssl_sni\":\"pos.baidu.com\",\"ssl_version\":\"v3\",\"ssl_cn\":\"baidu.com\",\"ssl_san\":\"baidu.com;click.hm.baidu.com;cm.pos.baidu.com;log.hm.baidu.com;update.pan.baidu.com;wn.pos.baidu.com;*.91.com;*.aipage.cn;*.aipage.com;*.apollo.auto;*.baidu.com;*.baidubce.com;*.baiducontent.com;*.baidupcs.com;*.baidustatic.com;*.baifubao.com;*.bce.baidu.com;*.bcehost.com;*.bdimg.com;*.bdstatic.com;*.bdtjrcv.com;*.bj.baidubce.com;*.chuanke.com;*.dlnel.com;*.dlnel.org;*.dueros.baidu.com;*.eyun.baidu.com;*.fanyi.baidu.com;*.gz.baidubce.com;*.hao123.baidu.com;*.hao123.com;*.hao222.com;*.haokan.com;*.im.baidu.com;*.map.baidu.com;*.mbd.baidu.com;*.mipcdn.com;*.news.baidu.com;*.nuomi.com;*.safe.baidu.com;*.smartapps.cn;*.su.baidu.com;*.trustgo.com;*.xueshu.baidu.com;apollo.auto;baifubao.com;dwz.cn;mct.y.nuomi.com;www.baidu.cn;www.baidu.com.cn\",\"common_schema_type\":\"SSL\",\"common_server_ip\":\"182.61.200.109\",\"common_client_ip\":\"192.168.50.144\",\"common_server_port\":443,\"common_client_port\":50529,\"common_stream_dir\":3,\"common_address_type\":4,\"common_s2c_pkt_num\":46,\"common_s2c_byte_num\":33149,\"common_c2s_pkt_num\":23,\"common_c2s_byte_num\":6147,\"common_start_time\":1576744784,\"common_end_time\":1576744799,\"common_con_duration_ms\":15000,\"common_stream_trace_id\":7686307990192,\"common_l4_protocol\":\"IPv4_TCP\",\"common_address_list\":\"50529-443-192.168.50.144-182.61.200.109\",\"common_sled_ip\":\"192.168.40.21\",\"common_policy_id\":172,\"common_service\":0,\"common_action\":2,\"common_user_region\":\"{\\\"protocol\\\":\\\"SSL\\\",\\\"protocol_version\\\":{\\\"allow_http2\\\":1,\\\"min\\\":\\\"ssl3\\\",\\\"max\\\":\\\"tls13\\\",\\\"mirror_client\\\":1},\\\"dynamic_bypass\\\":{\\\"mutual_authentication\\\":1,\\\"cert_pinning\\\":1,\\\"cert_transparency\\\":0,\\\"protocol_errors\\\":1,\\\"ev_cert\\\":0},\\\"decrypt_mirror\\\":{\\\"enable\\\":0},\\\"certificate_checks\\\":{\\\"fail_action\\\":\\\"pass-through\\\",\\\"approach\\\":{\\\"self-signed\\\":1,\\\"expiration\\\":1,\\\"cn\\\":1,\\\"issuer\\\":1}},\\\"keyring\\\":1}\"}"; Object object = JSONObject.parseObject(message, mapObject.getClass()); // System.out.println("补全之前 ===》 "+JSON.toJSONString(object)); try { for (String[] strings : jobList) { - //参数的值 - Object use = JsonParseUtil.getValue(object, strings[0]); - //补全的字段的值 + //用到的参数的值 + Object name = JsonParseUtil.getValue(object, strings[0]); + //需要补全的字段的值 Object appendTo = JsonParseUtil.getValue(object, strings[1]); + //匹配操作函数的字段 + String function=strings[2]; + //额外的参数的值 + Object param = null; + if (strings[3] != null){ + param=JsonParseUtil.getValue(object, strings[3]); + } - if (strings[2].equals("current_timestamp")) { + + if (function.equals("current_timestamp")) { JsonParseUtil.setValue(object, strings[1], getCurrentTime()); - } else if (strings[2].equals("snowflake_id")) { - JsonParseUtil.setValue(object, strings[1], getSnowflakeId()); - } else if (strings[2].equals("geo_ip_detail")) { - JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(use.toString())); - } else if (strings[2].equals("geo_asn")) { - JsonParseUtil.setValue(object, strings[1], getGeoAsn(use.toString())); - } else if (strings[2].equals("radius_match")) { - JsonParseUtil.setValue(object, strings[1], radiusMatch(use.toString())); - } else if (strings[2].equals("geo_ip_country")) { - JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(use.toString())); - } else if (strings[0].equals("http_host") && strings[2].equals("sub_domain") && use != null) { + } else if (function.equals("snowflake_id")) { + JsonParseUtil.setValue(object, strings[1], getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM)); + } else if (function.equals("geo_ip_detail")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString())); + } else if (function.equals("geo_asn")) { + JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString())); + } else if (function.equals("radius_match")) { + JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString(),FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS,FlowWriteConfig.HBASE_TABLE_NAME)); + } else if (function.equals("geo_ip_country")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString())); + } else if (function.equals("decode_of_base64") && param != null){ + JsonParseUtil.setValue(object, strings[1], base64Str(name.toString(),param.toString())); + } else if (name.equals("http_host") && function.equals("sub_domain")) { if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { - JsonParseUtil.setValue(object, strings[1], getTopDomain(null, use.toString())); + JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString())); } - } else if (strings[0].equals("ssl_sni") && strings[2].equals("sub_domain") && use != null) { + } else if (name.equals("ssl_sni") && strings[2].equals("sub_domain")) { if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { - JsonParseUtil.setValue(object, strings[1], getTopDomain(use.toString(), null)); + JsonParseUtil.setValue(object, strings[1], getTopDomain(name.toString(), null)); } } @@ -114,131 +104,9 @@ public class TransFormUtils { @Test public void aaa() { - String sni = "203.187.160.131:9011"; + String sni = "www.baidu.com"; System.out.println(getTopDomain(sni, null)); System.out.println(getTopDomain(null,sni)); } - - /** - * 有sni通过sni获取域名,有host根据host获取域名 - * - * @param sni sni - * @param host host - * @return 顶级域名 - */ - private static String getTopDomain(String sni, String host) { - if (StringUtil.isNotBlank(host)) { - return getDomainName(host); - } else if (StringUtil.isNotBlank(sni)) { - return getDomainName(sni); - } else { - return ""; - } - } - - - /** - * 根据url截取顶级域名 - * - * @param host 网站url - * @return 顶级域名 - */ - private static String getDomainName(String host) { - String domain = ""; - try { - domain = InternetDomainName.from(host).topPrivateDomain().toString(); - } catch (Exception e) { - logger.error("host解析顶级域名异常: " + e.getMessage()); - } - return domain; - } - - /** - * 生成当前时间戳的操作 - */ - private static int getCurrentTime() { - return (int)(System.currentTimeMillis() / 1000); - } - - /** - * 雪花模型生成id - * - * @return - */ - private static long getSnowflakeId() { - - return SnowflakeId.generateId(); - } - - /** - * 根据clientIp获取location信息 - * - * @param ip - * @return - */ - private static String getGeoIpDetail(String ip) { - - return ipLookup.cityLookupDetail(ip); - } - - /** - * 根据ip获取asn信息 - * - * @param ip - * @return - */ - private static String getGeoAsn(String ip) { - - return ipLookup.asnLookup(ip, true); - } - - /** - * 根据ip获取country信息 - * - * @param ip - * @return - */ - private static String getGeoIpCountry(String ip) { - - return ipLookup.countryLookup(ip); - } - - /** - * radius借助hbase补齐 - * - * @param ip - * @return - */ - private static String radiusMatch(String ip) { - return HBaseUtils.getAccount(ip); - } - - /** - * switch 匹配合适的方法 - * current_timestamp - * snowflake_id - * geo_ip_detail - * geo_asn - * radius_match - * geo_ip_country - * geo_asn - * sub_domain - * sub_domain - * @param func - */ -//TODO 行不通的原因是无法统一一个确定的返回值类型 - /* private static String switchFunc(String func){ - switch (func){ - case "current_timestamp": - return String.valueOf(getCurrentTime()); - case "snowflake_id": - case "geo_ip_detail": - case "geo_asn": - case "radius_match": - case "geo_ip_country": - case "sub_domain": - } - return func; - }*/ -} +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java index f386003..5c0e4c9 100644 --- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java +++ b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java @@ -1,8 +1,5 @@ package cn.ac.iie.utils.hbase; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.system.IpUtils; -import io.netty.util.collection.IntObjectHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -15,36 +12,55 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; - - -/** - * HBase 工具类 - * - * @author qidaijie - */ +import java.util.Timer; +import java.util.TimerTask; public class HBaseUtils { private final static Logger logger = Logger.getLogger(HBaseUtils.class); private static Map subIdMap = new HashMap<>(333334); - // private static Map subIdMap = new ConcurrentSkipListMap<>(); private static Connection connection; private static Long time; - static { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); - configuration.set("hbase.client.retries.number", "3"); - configuration.set("hbase.bulkload.retries.number", "3"); - configuration.set("zookeeper.recovery.retry", "3"); + private static String zookeeperIp; + private static String hbaseTable; + + private static HBaseUtils hBaseUtils; + + private static void getHbaseInstance(String zookeeperServer, String hbaseTableName) { + hBaseUtils = new HBaseUtils(zookeeperServer, hbaseTableName); + } + + /** + * 构造函数-新-20191023 + */ + public HBaseUtils(String zookeeperServer, String hbaseTableName) { + zookeeperIp = zookeeperServer; + hbaseTable = hbaseTableName; + //获取连接 + getHbaseConn(); + //拉取所有 + getAll(); + //定时更新 + updateHabaseCache(); + } + + private static void getHbaseConn() { try { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); connection = ConnectionFactory.createConnection(configuration); time = System.currentTimeMillis(); - getAll(); - } catch (IOException e) { - logger.error("获取HBase连接失败"); + logger.warn("HBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); e.printStackTrace(); } } @@ -53,7 +69,7 @@ public class HBaseUtils { * 更新变量 */ public static void change() { - Long nowTime = System.currentTimeMillis(); + long nowTime = System.currentTimeMillis(); timestampsFilter(time - 1000, nowTime + 500); } @@ -69,7 +85,7 @@ public class HBaseUtils { ResultScanner scanner = null; Scan scan2 = new Scan(); try { - table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + table = connection.getTable(TableName.valueOf("sub:" + hbaseTable)); scan2.setTimeRange(startTime, endTime); scanner = table.getScanner(scan2); for (Result result : scanner) { @@ -87,10 +103,14 @@ public class HBaseUtils { } } Long end = System.currentTimeMillis(); - logger.warn("当前集合长度" + subIdMap.keySet().size()); - logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end); + logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); + logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + begin + ",EndTime: " + end); time = endTime; - } catch (IOException e) { + } catch (IOException ioe) { + logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); e.printStackTrace(); } finally { if (scanner != null) { @@ -110,9 +130,9 @@ public class HBaseUtils { * 获取所有的 key value */ private static void getAll() { - Long begin = System.currentTimeMillis(); + long begin = System.currentTimeMillis(); try { - Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + Table table = connection.getTable(TableName.valueOf("sub:" + hbaseTable)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { @@ -121,10 +141,14 @@ public class HBaseUtils { subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } - logger.warn("获取全量后集合长度:" + subIdMap.size()); - logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin)); + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); scanner.close(); - } catch (IOException e) { + } catch (IOException ioe) { + logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + ioe.printStackTrace(); + } catch (Exception e) { + logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); e.printStackTrace(); } } @@ -135,8 +159,26 @@ public class HBaseUtils { * @param clientIp client_ip * @return account */ - public static String getAccount(String clientIp) { + public static String getAccount(String clientIp, String hbaseZookeeper, String hbaseTable) { + if (hBaseUtils == null) { + getHbaseInstance(hbaseZookeeper, hbaseTable); + } return subIdMap.get(clientIp); } + private void updateHabaseCache() { + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + change(); + } catch (Exception e) { + logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); + e.printStackTrace(); + } + } + }, 1, 1000 * 60); + } + } diff --git a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java index 3d9ede5..9ef09fc 100644 --- a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java +++ b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java @@ -145,6 +145,7 @@ public class JsonParseUtil { /** * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList) + * * @param http * @return */ @@ -167,25 +168,56 @@ public class JsonParseUtil { Object format = JSON.parseObject(doc.toString()).get("format"); if (format != null) { - Object functions = JSON.parseObject(format.toString()).get("functions"); - Object appendTo = JSON.parseObject(format.toString()).get("appendTo"); - if (appendTo != null) { - String[] functionArray = functions.toString().split(","); - String[] appendToArray = appendTo.toString().split(","); + String functions = null; + String appendTo = null; + String params = null; + Object functionsObj = JSON.parseObject(format.toString()).get("functions"); + Object appendToObj = JSON.parseObject(format.toString()).get("appendTo"); + Object paramObj = JSON.parseObject(format.toString()).get("param"); + + if (functionsObj != null) { + functions = functionsObj.toString(); + } + + if (appendToObj != null) { + appendTo = appendToObj.toString(); + } + if (paramObj != null) { + params = paramObj.toString(); + } + + + if (appendTo != null && params == null) { + String[] functionArray = functions.split(","); + String[] appendToArray = appendTo.split(","); for (int i = 0; i < functionArray.length; i++) { // useList.add(name); // toList.add(appendToArray[i]); // funcList.add(functionArray[i]); - list.add(new String[]{name, appendToArray[i], functionArray[i]}); + list.add(new String[]{name, appendToArray[i], functionArray[i],null}); } - } else { + }else if (appendTo != null && params != null){ + String[] functionArray = functions.split(","); + String[] appendToArray = appendTo.split(","); + String[] paramArray = params.split(","); + + for (int i = 0; i < functionArray.length; i++) { +// useList.add(name); +// toList.add(appendToArray[i]); +// funcList.add(functionArray[i]); + list.add(new String[]{name, appendToArray[i], functionArray[i],paramArray[i]}); + + } + } + + else { // useList.add(name); // funcList.add(functions.toString()); // toList.add(name); - list.add(new String[]{name, name, functions.toString()}); + list.add(new String[]{name, name, functions,params}); } } diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java index 5c3462d..8d67348 100644 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java @@ -1,20 +1,13 @@ package cn.ac.iie.utils.system; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.zookeeper.DistributedLock; import cn.ac.iie.utils.zookeeper.ZooKeeperLock; import cn.ac.iie.utils.zookeeper.ZookeeperUtils; import org.apache.log4j.Logger; -/** - * 雪花算法 - * - * @author qidaijie - */ + public class SnowflakeId { private static Logger logger = Logger.getLogger(SnowflakeId.class); - // ==============================Fields=========================================== /** * 开始时间截 (2018-08-01 00:00:00) max 17years */ @@ -31,7 +24,8 @@ public class SnowflakeId { private final long dataCenterIdBits = 4L; /** - * 支持的最大机器id,结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + * M << n = M * 2^n */ private final long maxWorkerId = -1L ^ (-1L << workerIdBits); @@ -90,53 +84,36 @@ public class SnowflakeId { private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - static { - idWorker = new SnowflakeId(); + private static void getSnowflakeldInstance(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { + idWorker = new SnowflakeId(zookeeperIp, kafkaTopic, dataCenterIdNum); } - //==============================Constructors===================================== - /** * 构造函数 */ -// private SnowflakeId() { -// DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); -// lock.lock(); -// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); -// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { -// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); -// } -// int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; -// if (dataCenterId > maxDataCenterId || dataCenterId < 0) { -// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); -// } -// this.workerId = tmpWorkerId; -// this.dataCenterId = dataCenterId; -// } - - private SnowflakeId() { - ZooKeeperLock lock = new ZooKeeperLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "/locks", "disLocks"); + private SnowflakeId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { + ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1"); if (lock.lock()) { - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp); if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } - int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; - if (dataCenterId > maxDataCenterId || dataCenterId < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); + if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); } this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterId; + this.dataCenterId = dataCenterIdNum; try { lock.unlock(); - } catch (InterruptedException e) { + } catch (InterruptedException ie) { + ie.printStackTrace(); + } catch (Exception e) { e.printStackTrace(); + logger.error("This is not usual error!!!===>>>" + e + "<<<==="); } } } - // ==============================Methods========================================== - /** * 获得下一个ID (该方法是线程安全的) * @@ -204,9 +181,10 @@ public class SnowflakeId { * * @return */ - public static Long generateId() { + public static Long generateId(String zookeeperIp, String kafkaTopic, long dataCenterIdNum) { + if (idWorker == null) { + getSnowflakeldInstance(zookeeperIp, kafkaTopic, dataCenterIdNum); + } return idWorker.nextId(); } - - } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java index f85a84e..46a7ff2 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java @@ -1,7 +1,5 @@ package cn.ac.iie.utils.zookeeper; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.system.SnowflakeId; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -15,9 +13,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; -/** - * @author qidaijie - */ + public class DistributedLock implements Lock, Watcher { private static Logger logger = Logger.getLogger(DistributedLock.class); @@ -83,7 +79,7 @@ public class DistributedLock implements Lock, Watcher { } try { if (this.tryLock()) { - System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + logger.warn(Thread.currentThread().getName() + " " + lockName + " is being locked......"); } else { // 等待锁 waitForLock(waitLock, sessionTimeout); @@ -98,7 +94,7 @@ public class DistributedLock implements Lock, Watcher { try { String splitStr = "_lock_"; if (lockName.contains(splitStr)) { - throw new LockException("锁名有误"); + throw new LockException("locked name is error!!!"); } // 创建临时有序节点 currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], @@ -187,20 +183,4 @@ public class DistributedLock implements Lock, Watcher { super(e); } } - - public static void main(String[] args) { - ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - Runnable runnable = new Runnable() { - @Override - public void run() { - DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - lock.lock(); - lock.unlock(); - } - }; - for (int i = 0; i < 10; i++) { - Thread t = new Thread(runnable); - t.start(); - } - } } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java index 6cff343..8d0a4d5 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZooKeeperLock.java @@ -1,5 +1,6 @@ package cn.ac.iie.utils.zookeeper; +import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -11,6 +12,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class ZooKeeperLock implements Watcher { + private static Logger logger = Logger.getLogger(ZooKeeperLock.class); + private ZooKeeper zk = null; private String rootLockNode; // 锁的根节点 private String lockName; // 竞争资源,用来生成子节点名称 @@ -33,13 +36,14 @@ public class ZooKeeperLock implements Watcher { } } catch (IOException | InterruptedException | KeeperException e) { e.printStackTrace(); + logger.error("ZooKeeperLock Constructors ===>>>Node already exists!"); } } // 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放 public boolean lock() { if (this.tryLock()) { - System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!"); + logger.warn("ZooKeeperLock method lock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] addZkLock(" + this.currentLock + ")success!"); return true; } else { return waitOtherLock(this.waitLock, this.sessionTimeout); @@ -56,8 +60,7 @@ public class ZooKeeperLock implements Watcher { // 创建锁节点(临时有序节点) this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - System.out.println("线程【" + Thread.currentThread().getName() - + "】创建锁节点(" + this.currentLock + ")成功,开始竞争..."); + logger.warn("ZooKeeperLock method tryLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] create zkLockNode(" + this.currentLock + ")success,begin to election..."); // 取所有子节点 List nodes = zk.getChildren(this.rootLockNode, false); // 取所有竞争lockName的锁 @@ -90,18 +93,15 @@ public class ZooKeeperLock implements Watcher { String waitLockNode = this.rootLockNode + "/" + waitLock; Stat stat = zk.exists(waitLockNode, true); if (null != stat) { - System.out.println("线程【" + Thread.currentThread().getName() - + "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放..."); + logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock fail,wait lock(" + waitLockNode + ")release..."); // 设置计数器,使用计数器阻塞线程 this.countDownLatch = new CountDownLatch(1); islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS); this.countDownLatch = null; if (islock) { - System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" - + this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放"); + logger.warn("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + ")addZkLock success,lock(" + waitLockNode + ")release over."); } else { - System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" - + this.currentLock + ")加锁失败..."); + logger.error("ZooKeeperLock method waitOtherLock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] zkLock(" + this.currentLock + "addZkLock fail..."); } } else { islock = true; @@ -117,7 +117,7 @@ public class ZooKeeperLock implements Watcher { try { Stat stat = zk.exists(this.currentLock, false); if (null != stat) { - System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock); + logger.warn("ZooKeeperLock method unlock() ===>>> zkLockProcess[[[" + Thread.currentThread().getName() + "]]] free zkLock " + this.currentLock); zk.delete(this.currentLock, -1); this.currentLock = null; } @@ -137,34 +137,4 @@ public class ZooKeeperLock implements Watcher { } } - - public static void doSomething() { - - } - - public static void main(String[] args) { - Runnable runnable = new Runnable() { - @Override - public void run() { - ZooKeeperLock lock = null; - lock = new ZooKeeperLock("192.168.40.119:2181", "/locks", "test1"); - if (lock.lock()) { - doSomething(); - try { -// Thread.sleep(1000); - lock.unlock(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }; - - for (int i = 0; i < 10; i++) { - Thread t = new Thread(runnable); - t.start(); - } - } - - } diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java index 639b50c..4a022e9 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java @@ -1,6 +1,5 @@ package cn.ac.iie.utils.zookeeper; -import cn.ac.iie.common.FlowWriteConfig; import org.apache.commons.lang3.RandomUtils; import org.apache.log4j.Logger; import org.apache.zookeeper.*; @@ -11,9 +10,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; -/** - * @author qidaijie - */ + public class ZookeeperUtils implements Watcher { private static Logger logger = Logger.getLogger(ZookeeperUtils.class); @@ -36,12 +33,11 @@ public class ZookeeperUtils implements Watcher { * * @param path 节点路径 */ - public int modifyNode(String path) { - createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE); - createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); - int workerId; + public int modifyNode(String path, String zookeeperIp) { + createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); + int workerId = 0; try { - connectZookeeper(); + connectZookeeper(zookeeperIp); Stat stat = zookeeper.exists(path, true); workerId = Integer.parseInt(getNodeDate(path)); if (workerId > 55) { @@ -58,20 +54,22 @@ public class ZookeeperUtils implements Watcher { } catch (KeeperException | InterruptedException e) { e.printStackTrace(); workerId = RandomUtils.nextInt(56, 63); - } finally { + } + finally { closeConn(); } - logger.error("工作ID是:" + workerId); + logger.warn("workerID is:" + workerId); return workerId; } /** * 连接zookeeper * + * @param host 地址 */ - private void connectZookeeper() { + public void connectZookeeper(String host) { try { - zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this); + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); countDownLatch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); @@ -81,7 +79,7 @@ public class ZookeeperUtils implements Watcher { /** * 关闭连接 */ - private void closeConn() { + public void closeConn() { try { if (zookeeper != null) { zookeeper.close(); @@ -97,7 +95,7 @@ public class ZookeeperUtils implements Watcher { * @param path 节点路径 * @return 内容/异常null */ - private String getNodeDate(String path) { + public String getNodeDate(String path) { String result = null; Stat stat = new Stat(); try { @@ -115,14 +113,18 @@ public class ZookeeperUtils implements Watcher { * @param date 节点所存储的数据的byte[] * @param acls 控制权限策略 */ - private void createNode(String path, byte[] date, List acls) { + public void createNode(String path, byte[] date, List acls, String zookeeperIp) { try { - connectZookeeper(); + connectZookeeper(zookeeperIp); Stat exists = zookeeper.exists(path, true); if (exists == null) { + Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); + if (existsSnowflakeld == null) { + zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); + } zookeeper.create(path, date, acls, CreateMode.PERSISTENT); } else { - logger.warn("Node already exists!,Don't need to create"); + logger.warn("Node already exists ! Don't need to create"); } } catch (KeeperException | InterruptedException e) { e.printStackTrace();