diff --git a/pom.xml b/pom.xml
index def6d01..44528db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,7 +67,7 @@
properties
**/*.properties
- **/*.xml
+
false
diff --git a/properties/core-site.xml b/properties/core-site.xml
new file mode 100644
index 0000000..93dfb1d
--- /dev/null
+++ b/properties/core-site.xml
@@ -0,0 +1,71 @@
+
+
+
+
+
+
+
+
+ fs.defaultFS
+ hdfs://ns1
+
+
+ hadoop.tmp.dir
+ file:/opt/hadoop/tmp
+
+
+ io.file.buffer.size
+ 131702
+
+
+ hadoop.proxyuser.root.hosts
+ *
+
+
+ hadoop.proxyuser.root.groups
+ *
+
+
+ hadoop.logfile.size
+ 10000000
+ The max size of each log file
+
+
+
+ hadoop.logfile.count
+ 1
+ The max number of log files
+
+
+ ha.zookeeper.quorum
+ master:2181,slave1:2181,slave2:2181
+
+
+
+ fs.hdfs.impl
+ org.apache.hadoop.hdfs.DistributedFileSystem
+ The FileSystem for hdfs: uris.
+
+
+
+io.compression.codecs
+com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
+
+
+io.compression.codec.lzo.class
+com.hadoop.compression.lzo.LzoCodec
+
+
+
diff --git a/properties/hbase-site.xml b/properties/hbase-site.xml
new file mode 100644
index 0000000..54554e4
--- /dev/null
+++ b/properties/hbase-site.xml
@@ -0,0 +1,77 @@
+
+
+
+
+
+ hbase.rootdir
+ hdfs://ns1/hbase-1.4.9
+
+
+ hbase.cluster.distributed
+ true
+
+
+ hbase.zookeeper.quorum
+ 192.168.40.119,192.168.40.122,192.168.40.123
+
+
+hbase.master.info.port
+60010
+
+
+
+ phoenix.schema.isNamespaceMappingEnabled
+ true
+
+
+ phoenix.schema.mapSystemTablesToNamespace
+ true
+
+
+ hbase.client.keyvalue.maxsize
+ 99428800
+
+
+ hbase.server.keyvalue.maxsize
+ 99428800
+
+
+ hbase.regionserver.wal.codec
+ org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec
+
+
+ phoenix.query.timeoutMs
+ 1800000
+
+
+ hbase.rpc.timeout
+ 1200000
+
+
+ hbase.client.scanner.caching
+ 1000
+
+
+ hbase.client.scanner.timeout.period
+ 1200000
+
+
diff --git a/properties/hdfs-site.xml b/properties/hdfs-site.xml
new file mode 100644
index 0000000..1e148b7
--- /dev/null
+++ b/properties/hdfs-site.xml
@@ -0,0 +1,116 @@
+
+
+
+
+
+
+
+
+ dfs.namenode.name.dir
+ file:/home/ceiec/hadoop/dfs/name
+
+
+ dfs.datanode.data.dir
+ file:/home/ceiec/hadoop/dfs/data
+
+
+ dfs.replication
+ 2
+
+
+ dfs.namenode.secondary.http-address
+ 192.168.40.119:9001
+
+
+ dfs.webhdfs.enabled
+ true
+
+
+ dfs.permissions
+ false
+
+
+ dfs.permissions.enabled
+ false
+
+
+ dfs.nameservices
+ ns1
+
+
+ dfs.blocksize
+ 134217728
+
+
+ dfs.ha.namenodes.ns1
+ nn1,nn2
+
+
+
+ dfs.namenode.rpc-address.ns1.nn1
+ 192.168.40.119:8020
+
+
+
+ dfs.namenode.http-address.ns1.nn1
+ 192.168.40.119:50070
+
+
+
+ dfs.namenode.rpc-address.ns1.nn2
+ 192.168.40.122:8020
+
+
+
+ dfs.namenode.http-address.ns1.nn2
+ 192.168.40.122:50070
+
+
+
+ dfs.namenode.shared.edits.dir
+ qjournal://192.168.40.119:8485;192.168.40.122:8485;192.168.40.123:8485/ns1
+
+
+
+ dfs.journalnode.edits.dir
+ /home/ceiec/hadoop/journal
+
+
+
+ dfs.client.failover.proxy.provider.ns1
+ org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+
+
+
+ dfs.ha.fencing.methods
+ sshfence
+
+
+
+ dfs.ha.fencing.ssh.private-key-files
+ /root/.ssh/id_rsa
+
+
+
+ dfs.ha.fencing.ssh.connect-timeout
+ 30000
+
+
+
+ dfs.ha.automatic-failover.enabled
+ true
+
+
+
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 8646d78..24a5401 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -6,9 +6,10 @@ zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
+#hbase.zookeeper.servers=192.168.40.224:2182
#hbase tablename
-hbase.table.name=subcriber_info
+hbase.table.name=subscriber_info
#latest/earliest
auto.offset.reset=latest
@@ -17,22 +18,22 @@ auto.offset.reset=latest
kafka.topic=SECURITY-EVENT-LOG
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=security-policy-191112
+group.id=security-policy-191114
#输出topic
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
#storm topology workers
-topology.workers=2
+topology.workers=3
#spout并行度 建议与kafka分区数相同
-spout.parallelism=3
+spout.parallelism=6
#处理补全操作的bolt并行度-worker的倍数
-datacenter.bolt.parallelism=10
+datacenter.bolt.parallelism=12
#写入kafka的并行度10
-kafka.bolt.parallelism=10
+kafka.bolt.parallelism=12
#定位库地址
ip.library=/home/ceiec/topology/dat/
diff --git a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java
index 2883835..541e7b1 100644
--- a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java
+++ b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java
@@ -41,6 +41,7 @@ public class PublicSessionRecordLog {
private String common_user_region;
private String common_client_ip;
private String common_device_id;
+ private String common_sub_action;
private String common_isp;
private String common_sled_ip;
private String common_client_location;
@@ -381,4 +382,12 @@ public class PublicSessionRecordLog {
public void setCommon_stream_trace_id(long common_stream_trace_id) {
this.common_stream_trace_id = common_stream_trace_id;
}
+
+ public String getCommon_sub_action() {
+ return common_sub_action;
+ }
+
+ public void setCommon_sub_action(String common_sub_action) {
+ this.common_sub_action = common_sub_action;
+ }
}
diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java
index f91a57a..f16e0da 100644
--- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java
+++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java
@@ -8,10 +8,9 @@ import cn.ac.iie.utils.system.FlowWriteConfigurations;
*/
public class FlowWriteConfig {
- public static final String LOG_STRING_SPLITTER = "\t";
- public static final String SQL_STRING_SPLITTER = "#";
+ public static final int IPV4_TYPE = 1;
+ public static final int IPV6_TYPE = 2;
public static final String DOMAIN_SPLITTER = ".";
-
/**
* System
*/
diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java
index 79e067e..72003c0 100644
--- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java
+++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java
@@ -60,7 +60,7 @@ public class TransFormUtils {
radiusSessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true));
return JSONObject.toJSONString(radiusSessionRecordLog);
} catch (Exception e) {
- logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
+ logger.error("{} 日志解析过程出现异常" + FlowWriteConfig.KAFKA_TOPIC);
e.printStackTrace();
return "";
}
diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java
index f1db9aa..068f619 100644
--- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java
+++ b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java
@@ -1,7 +1,8 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
+import cn.ac.iie.utils.system.IpUtils;
+import io.netty.util.collection.IntObjectHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -14,6 +15,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* HBase 工具类
@@ -23,7 +25,7 @@ import java.util.Map;
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
- private static Map subIdMap = new HashMap<>(32);
+ private static Map subIdMap = new ConcurrentHashMap<>(333334);
private static Connection connection;
private static Long time;
@@ -32,6 +34,9 @@ public class HBaseUtils {
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
try {
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
@@ -47,7 +52,7 @@ public class HBaseUtils {
*/
public static void change() {
Long nowTime = System.currentTimeMillis();
- timestampsFilter(time - 1500, nowTime + 500);
+ timestampsFilter(time - 1000, nowTime + 500);
}
/**
@@ -65,15 +70,23 @@ public class HBaseUtils {
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
- logger.warn("读取HBase变量耗时:" + (System.currentTimeMillis() - begin));
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
- subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ String key = Bytes.toString(CellUtil.cloneRow(cell));
+ String value = Bytes.toString(CellUtil.cloneValue(cell));
+ if (subIdMap.containsKey(key)) {
+ if (!value.equals(subIdMap.get(key))) {
+ subIdMap.put(key, value);
+ }
+ } else {
+ subIdMap.put(key, value);
+ }
}
}
+ Long end = System.currentTimeMillis();
logger.warn("当前集合长度" + subIdMap.keySet().size());
- logger.warn("更新缓存耗时:" + (System.currentTimeMillis() - begin));
+ logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
time = endTime;
} catch (IOException e) {
e.printStackTrace();
@@ -114,18 +127,13 @@ public class HBaseUtils {
}
}
-
/**
* 获取 account
*
- * @param ip client_ip
+ * @param clientIp client_ip
* @return account
*/
- public static String getAccount(String ip) {
- if (StringUtil.isNotBlank(ip)) {
- return subIdMap.get(ip);
- } else {
- return "";
- }
+ public static String getAccount(String clientIp) {
+ return subIdMap.get(clientIp);
}
}
diff --git a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java
index 124344f..c51589b 100644
--- a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java
+++ b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java
@@ -45,6 +45,22 @@ public class InfluxDbUtils {
}
}
+ /**
+ * 记录对准失败次数-即内存中没有对应的key
+ *
+ * @param failure 对准失败量
+ */
+ public static void sendHBaseFailure(int failure) {
+ if (failure != 0) {
+ InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
+ Point point1 = Point.measurement("sendHBaseFailure")
+ .tag("topic", FlowWriteConfig.KAFKA_TOPIC)
+ .field("failure", failure)
+ .build();
+ client.write("BusinessMonitor", "", point1);
+ }
+ }
+
/**
* 获取本机IP
*
diff --git a/src/main/java/cn/ac/iie/utils/system/IpUtils.java b/src/main/java/cn/ac/iie/utils/system/IpUtils.java
new file mode 100644
index 0000000..94fefa6
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/system/IpUtils.java
@@ -0,0 +1,56 @@
+package cn.ac.iie.utils.system;
+
+/**
+ * IP工具类
+ *
+ * @author qidaijie
+ */
+public class IpUtils {
+ /**
+ * IPV4 正则
+ */
+ private static final String IPV4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$";
+ /**
+ * IPV6正则
+ */
+ private static final String IPV6 = "^(([\\da-fA-F]{1,4}):){8}$";
+
+
+ /**
+ * 判断IP类型 v4 or v6
+ *
+ * @param ip IP
+ * @return 1:v4 2:v6 3:abnormal
+ */
+ public static int validIPAddress(String ip) {
+ return String.format("%s.", ip).matches(IPV4) ? 1 : String.format("%s:", ip).matches(IPV6) ? 2 : 3;
+ }
+
+ /**
+ * ip字符串转整数
+ * ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256.
+ *
+ * @param ip IP
+ * @return ip(int)
+ */
+ public static int ipChangeInt(String ip) {
+ //分割ip
+ String[] ipSplit = ip.split("\\.");
+ int result = 0;
+ for (int i = 0; i < 4; i++) {
+ Integer ipSubInteger = Integer.parseInt(ipSplit[i]);
+ //正则验证不能为负数
+ if (ipSubInteger > 255) {
+ result = 0;
+ break;
+ }
+ result += (ipSubInteger << (24 - i * 8));
+ }
+ return result;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(validIPAddress("192.254.254.254"));
+ System.out.println(ipChangeInt("254.254.254.254"));
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/DomainUtils.java b/src/test/java/cn/ac/iie/test/DomainUtils.java
index e7bdf78..ec2cce4 100644
--- a/src/test/java/cn/ac/iie/test/DomainUtils.java
+++ b/src/test/java/cn/ac/iie/test/DomainUtils.java
@@ -5,33 +5,72 @@ import com.zdjizhi.utils.StringUtil;
import javax.xml.bind.SchemaOutputResolver;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DomainUtils {
- private static Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
+ private static Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.me|\\.cc|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
+
+ private final static Set PublicSuffixSet = new HashSet(
+ Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se"
+ .split("\\|")));
+ private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
public static void main(String[] args) {
- System.out.println(getTopDomain("agoo-report.m.taobao.com"));
+ System.out.println(getTopDomain("http://www.ccb.com"));
+ System.out.println(getDomainName("www.comaa.com"));
+
}
private static String getTopDomain(String url) {
-// try {
- //获取值转换为小写
-// String host = new URL(url).getHost().toLowerCase();//news.hexun.com
+ try {
+// 获取值转换为小写
+ String host = new URL(url).getHost().toLowerCase();//news.hexun.com
// Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
- Matcher matcher = pattern.matcher(url);
+ Matcher matcher = IP_PATTERN.matcher(host);
if (matcher.find()){
return matcher.group();
}
-// } catch (MalformedURLException e) {
-// e.printStackTrace();
-// }
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
return null;
}
+
+// private final static Set PublicSuffixSet = new HashSet(
+// Arrays.asList("com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cc|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se"
+// .split("\\|")));
+// private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})");
+
+ /**
+ * 获取url的顶级域名
+// * @param url
+ * @return
+ */
+ public static String getDomainName(String host) {
+// String host = url.getHost();
+ if (host.endsWith(".")){
+ host = host.substring(0, host.length() - 1);
+ }
+ if (IP_PATTERN.matcher(host).matches()){
+ return host;
+ }
+ int index = 0;
+ String candidate = host;
+ for (; index >= 0;) {
+ index = candidate.indexOf('.');
+ String subCandidate = candidate.substring(index + 1);
+ if (PublicSuffixSet.contains(subCandidate)) {
+ return candidate;
+ }
+ candidate = subCandidate;
+ }
+ return candidate;
+ }
+
+
}
diff --git a/src/test/java/cn/ac/iie/test/a.json b/src/test/java/cn/ac/iie/test/a.json
new file mode 100644
index 0000000..e70c1b8
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/a.json
@@ -0,0 +1,85 @@
+{
+ "bgp_type": 0,
+ "common_action": 16,
+ "common_address_list": "",
+ "common_address_type": 4,
+ "common_app_id": 0,
+ "common_app_label": "",
+ "common_c2s_byte_num": 639,
+ "common_c2s_pkt_num": 1,
+ "common_client_asn": "36351",
+ "common_client_ip": "75.126.99.155",
+ "common_client_location": "Dallas\tTexas\tUnited States",
+ "common_client_port": 40846,
+ "common_con_duration_ms": 113814,
+ "common_device_id": "4586496",
+ "common_direction": 1,
+ "common_encapsulation": 8,
+ "common_end_time": 1574842412,
+ "common_entrance_id": 8,
+ "common_has_dup_traffic": 1,
+ "common_isp": "China Telecom",
+ "common_l4_protocol": "MPLS",
+ "common_link_id": 1,
+ "common_log_id": 172027081238036520,
+ "common_policy_id": 902,
+ "common_protocol_id": 0,
+ "common_recv_time": 1574842413,
+ "common_s2c_byte_num": 1360,
+ "common_s2c_pkt_num": 26,
+ "common_schema_type": "BGP",
+ "common_server_asn": "9050",
+ "common_server_ip": "92.85.69.150",
+ "common_server_location": "Romania",
+ "common_server_port": 53,
+ "common_service": 8,
+ "common_sled_ip": "192.168.10.58",
+ "common_start_time": 1574842361,
+ "common_stream_dir": 1,
+ "common_stream_error": "",
+ "common_stream_trace_id": 0,
+ "common_subscriber_id": "zareP",
+ "common_user_region": "973ebGTTwdBhecbqI9U724LJdyHWV3BOUIcy4jgtpd221GV2QSOLMZc2awba3GfqKCiQxfirv5NjptRbawXDIpw4pJ0Xg4WZJSKW",
+ "common_user_tags": "qeT9tif1iRp1qCq6pauMO0RqsV13ktQm4Jlp4ZBOFeaQufoJMbC5tQ70ebDI1F9Ffw8c580e9yd27v96M6i4CPN8mEDw1mIkMexT",
+ "dns_aa": 0,
+ "dns_ancount": 0,
+ "dns_arcount": 0,
+ "dns_message_id": 0,
+ "dns_nscount": 0,
+ "dns_opcode": 0,
+ "dns_qclass": 0,
+ "dns_qdcount": 0,
+ "dns_qr": 0,
+ "dns_qtype": 0,
+ "dns_ra": 0,
+ "dns_rcode": 0,
+ "dns_rd": 0,
+ "dns_sub": 0,
+ "dns_tc": 0,
+ "http_content_length": "48895",
+ "http_content_type": "application/x-jpg",
+ "http_domain": "zhiyin.cn",
+ "http_host": "v.zhiyin.cn97991",
+ "http_proxy_flag": 1,
+ "http_referer": "",
+ "http_request_body": "",
+ "http_request_body_key": "",
+ "http_request_header": "",
+ "http_request_line": "",
+ "http_response_body": "",
+ "http_response_body_key": "",
+ "http_response_header": "",
+ "http_response_line": "",
+ "http_sequence": 6,
+ "http_set_cookie": "",
+ "http_snapshot": "",
+ "http_url": "http://v.zhiyin.cn/watch/295.html1661741",
+ "http_user_agent": "",
+ "http_version": "http1",
+ "ssl_cert_verify": 0,
+ "ssl_client_side_latency": 0,
+ "ssl_con_latency_ms": 0,
+ "ssl_intercept_state": 0,
+ "ssl_pinningst": 0,
+ "ssl_server_side_latency": 0
+}
diff --git a/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java
index 9e94387..44f176a 100644
--- a/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java
+++ b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java
@@ -2,7 +2,9 @@ package cn.ac.iie.test.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
+import cn.ac.iie.utils.system.IpUtils;
import com.zdjizhi.utils.StringUtil;
+import io.netty.util.collection.IntObjectHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -17,10 +19,13 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class HBaseTest {
private final static Logger logger = Logger.getLogger(HBaseTest.class);
- private static Map subIdMap = new HashMap<>(16);
+// private static Map subIdMap = new ConcurrentHashMap(13333334);
+ private static Map subIdMap = new HashMap<>(13333334);
private static Connection connection;
private static Long time;
@@ -29,9 +34,11 @@ public class HBaseTest {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
// configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
-// configuration.setInt("zookeeper.session.timeout", 2000);
+ configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181");
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
try {
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
@@ -41,22 +48,10 @@ public class HBaseTest {
}
}
-// @Before
-// public void connHBase() {
-//
-// }
-
@Test
public void change() {
// Long begin = System.currentTimeMillis();
-// System.gc();
-// Long start = Runtime.getRuntime().freeMemory();
-// System.out.println("开始内存"+start);
// getAll();
-// System.gc();
-// Long end = Runtime.getRuntime().freeMemory();
-// System.out.println("结束内存"+end);
-// System.out.println( "一个HashMap对象占内存: " + (end - start));
// System.out.println(System.currentTimeMillis() - begin);
}
@@ -67,31 +62,39 @@ public class HBaseTest {
* @param endTime 结束时间
*/
private static void timestampsFilter(Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
Table table = null;
- TableName tableName = TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME);
- Admin admin = null;
+ ResultScanner scanner = null;
+ Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
- Scan scan2 = new Scan();
-// scan2.setCaching(500);
scan2.setTimeRange(startTime, endTime);
- ResultScanner scanner = table.getScanner(scan2);
- scanner.next().isEmpty();
+ scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
- subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+// int key = Integer.parseInt(Bytes.toString(CellUtil.cloneRow(cell)));
+ String key = Bytes.toString(CellUtil.cloneRow(cell));
+ String value = Bytes.toString(CellUtil.cloneValue(cell));
+ if (subIdMap.containsKey(key)) {
+ if (!value.equals(subIdMap.get(key))) {
+ subIdMap.put(key, value);
+ }
+ } else {
+ subIdMap.put(key, value);
+ }
}
}
- admin = connection.getAdmin();
- admin.flush(tableName);
+ Long end = System.currentTimeMillis();
logger.warn("当前集合长度" + subIdMap.keySet().size());
- logger.warn("更新后集合keys:" + subIdMap.keySet());
+ logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
time = endTime;
- scanner.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
+ if (scanner != null) {
+ scanner.close();
+ }
if (table != null) {
try {
table.close();
@@ -105,7 +108,8 @@ public class HBaseTest {
/**
* 获取所有的 key value
*/
- public static void getAll() {
+ private static void getAll() {
+ Long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
@@ -113,28 +117,32 @@ public class HBaseTest {
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
+// subIdMap.put(Integer.valueOf(Bytes.toString(CellUtil.cloneRow(cell))), Bytes.toString(CellUtil.cloneValue(cell)));
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
- logger.warn("获取全量后集合size:" + subIdMap.size());
+ logger.warn("获取全量后集合长度:" + subIdMap.size());
+ logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
-
/**
* 获取 account
*
- * @param ip client_ip
+ * @param clientIp client_ip
* @return account
*/
- public static String getAccount(String ip) {
- if (StringUtil.isNotBlank(ip)) {
- return subIdMap.get(ip);
- } else {
- return "";
+ public static String getAccount(String clientIp) {
+ int ipType = cn.ac.iie.utils.system.IpUtils.validIPAddress(clientIp);
+ String account = "";
+ if (ipType == FlowWriteConfig.IPV4_TYPE) {
+ account = subIdMap.get(IpUtils.ipChangeInt(clientIp));
+ } else if (ipType == FlowWriteConfig.IPV6_TYPE) {
+ account = subIdMap.get(clientIp);
}
+ return account;
}
}
diff --git a/src/test/java/cn/ac/iie/test/hbase/IpUtils.java b/src/test/java/cn/ac/iie/test/hbase/IpUtils.java
new file mode 100644
index 0000000..1cfebe0
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/hbase/IpUtils.java
@@ -0,0 +1,63 @@
+package cn.ac.iie.test.hbase;
+
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.general.EncryptionUtils;
+import org.apache.log4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IpUtils {
+ private static Logger logger = Logger.getLogger(IpUtils.class);
+
+ public static void main(String[] args) {
+ System.out.println(System.currentTimeMillis());
+ System.out.println(System.currentTimeMillis() - 60000);
+ }
+
+ /**
+ * ip字符串转整数
+ * ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256.
+ *
+ * @param ip
+ * @return
+ */
+ public static int ip2Int(String ip) {
+ String[] ipStrs = ip.split("\\.");//分割ip
+ int result = 0;
+ for (int i = 0; i < 4; i++) {
+ Integer ipSubInteger = Integer.parseInt(ipStrs[i]);
+ if (ipSubInteger > 255) {//正则验证不能为负数
+ result = 0;
+ break;
+ }
+ result += (ipSubInteger << (24 - i * 8));
+ }
+ return result;
+ }
+
+
+ /**
+ * 整数转ip
+ *
+ * @param ip
+ * @return
+ */
+ public static String int2Ip(int ip) {
+ StringBuilder builder = new StringBuilder(String.valueOf(ip >>> 24));
+ builder.append(".");
+ builder.append(String.valueOf((ip & 0X00FFFFFF) >>> 16));
+ builder.append(".");
+ builder.append(String.valueOf((ip & 0X0000FFFF) >>> 8));
+ builder.append(".");
+ builder.append(String.valueOf(ip & 0X000000FF));
+ return builder.toString();
+ }
+
+
+ public static int validIPAddress(String ip) {
+ String ipv4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$";
+ //8个1-4位+:
+ String ipv6 = "^(([\\da-fA-F]{1,4}):){8}$";
+ return String.format("%s.", ip).matches(ipv4) ? 1 : String.format("%s:", ip).matches(ipv6) ? 2 : 3;
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/test.java b/src/test/java/cn/ac/iie/test/test.java
index f65cd5f..7876fa2 100644
--- a/src/test/java/cn/ac/iie/test/test.java
+++ b/src/test/java/cn/ac/iie/test/test.java
@@ -1,25 +1,29 @@
package cn.ac.iie.test;
+import cn.ac.iie.common.FlowWriteConfig;
+import cn.ac.iie.utils.general.TransFormUtils;
+import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class test {
+ private static Logger logger = Logger.getLogger(test.class);
public static void main(String[] args) {
String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\",\"uid\":\"0\"}";
// SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class);
// System.out.println(JSONObject.toJSONString(sessionRecordLog));
+ JSONObject obj = JSONObject.parseObject(message);
+ obj.put("abc","bca");
+ System.out.println(obj.toString());
+
}
@Test
public void test2() {
- Map map = new HashMap<>();
- map.put("a","a");
- map.put("b","a");
- map.put("c","a");
-
- System.out.println(map.keySet());
+ logger.error("{} 日志解析过程出现异常" + FlowWriteConfig.KAFKA_TOPIC);
}
}