diff --git a/pom.xml b/pom.xml index 2e03234..4cda26b 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ nexus Team Nexus Repository - http://192.168.10.125:8099/content/groups/public + http://192.168.40.125:8099/content/groups/public @@ -67,6 +67,7 @@ properties **/*.properties + false @@ -84,6 +85,8 @@ UTF-8 1.0.0 1.0.2 + 2.2.1 + 2.7.1 @@ -129,12 +132,6 @@ ${storm.version} - - redis.clients - jedis - 2.8.1 - - junit junit @@ -145,13 +142,18 @@ com.alibaba fastjson - 1.2.47 + 1.2.59 + + + cglib + cglib-nodep + 3.2.4 com.zdjizhi galaxy - 1.0.1 + 1.0.2 slf4j-log4j12 @@ -181,6 +183,110 @@ + + + org.apache.hbase + hbase-client + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hbase + hbase-server + ${hbase.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + + org.apache.httpcomponents + httpclient + 4.5.2 + + + + org.apache.httpcomponents + httpcore + 4.4.1 + + diff --git a/properties/core-site.xml b/properties/core-site.xml new file mode 100644 index 0000000..93dfb1d --- /dev/null +++ b/properties/core-site.xml @@ -0,0 +1,71 @@ + + + + + + + + + fs.defaultFS + hdfs://ns1 + + + hadoop.tmp.dir + file:/opt/hadoop/tmp + + + io.file.buffer.size + 131702 + + + hadoop.proxyuser.root.hosts + * + + + hadoop.proxyuser.root.groups + * + + + hadoop.logfile.size + 10000000 + The max size of each log file + + + + hadoop.logfile.count + 1 + The max number of log files + + + ha.zookeeper.quorum + master:2181,slave1:2181,slave2:2181 + + +    +     fs.hdfs.impl   +     org.apache.hadoop.hdfs.DistributedFileSystem   +     The FileSystem for hdfs: uris.   + + + +io.compression.codecs +com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec + + +io.compression.codec.lzo.class +com.hadoop.compression.lzo.LzoCodec + + + diff --git a/properties/hbase-site.xml b/properties/hbase-site.xml new file mode 100644 index 0000000..54554e4 --- /dev/null +++ b/properties/hbase-site.xml @@ -0,0 +1,77 @@ + + + + + + hbase.rootdir + hdfs://ns1/hbase-1.4.9 + + + hbase.cluster.distributed + true + + + hbase.zookeeper.quorum + 192.168.40.119,192.168.40.122,192.168.40.123 + + +hbase.master.info.port +60010 + + + + phoenix.schema.isNamespaceMappingEnabled + true + + + phoenix.schema.mapSystemTablesToNamespace + true + + + hbase.client.keyvalue.maxsize + 99428800 + + + hbase.server.keyvalue.maxsize + 99428800 + + + hbase.regionserver.wal.codec + org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec + + + phoenix.query.timeoutMs + 1800000 + + + hbase.rpc.timeout + 1200000 + + + hbase.client.scanner.caching + 1000 + + + hbase.client.scanner.timeout.period + 1200000 + + diff --git a/properties/hdfs-site.xml b/properties/hdfs-site.xml new file mode 100644 index 0000000..1e148b7 --- /dev/null +++ b/properties/hdfs-site.xml @@ -0,0 +1,116 @@ + + + + + + + + + dfs.namenode.name.dir + file:/home/ceiec/hadoop/dfs/name + + + dfs.datanode.data.dir + file:/home/ceiec/hadoop/dfs/data + + + dfs.replication + 2 + + + dfs.namenode.secondary.http-address + 192.168.40.119:9001 + + + dfs.webhdfs.enabled + true + + + dfs.permissions + false + + + dfs.permissions.enabled + false + + + dfs.nameservices + ns1 + + + dfs.blocksize + 134217728 + + + dfs.ha.namenodes.ns1 + nn1,nn2 + + + + dfs.namenode.rpc-address.ns1.nn1 + 192.168.40.119:8020 + + + + dfs.namenode.http-address.ns1.nn1 + 192.168.40.119:50070 + + + + dfs.namenode.rpc-address.ns1.nn2 + 192.168.40.122:8020 + + + + dfs.namenode.http-address.ns1.nn2 + 192.168.40.122:50070 + + + + dfs.namenode.shared.edits.dir + qjournal://192.168.40.119:8485;192.168.40.122:8485;192.168.40.123:8485/ns1 + + + + dfs.journalnode.edits.dir + /home/ceiec/hadoop/journal + + + + dfs.client.failover.proxy.provider.ns1 + org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider + + + + dfs.ha.fencing.methods + sshfence + + + + dfs.ha.fencing.ssh.private-key-files + /root/.ssh/id_rsa + + + + dfs.ha.fencing.ssh.connect-timeout + 30000 + + + + dfs.ha.automatic-failover.enabled + true + + + diff --git a/properties/redis_config.properties b/properties/redis_config.properties deleted file mode 100644 index f99d396..0000000 --- a/properties/redis_config.properties +++ /dev/null @@ -1,19 +0,0 @@ -#*****************jedis连接参数设置********************* -#redis服务器ip -redis.ip=192.168.40.123 -#redis服务器端口号 -redis.port=6379 -#与服务器建立连接的超时时间 -redis.timeout=3000 -#************************jedis池参数设置******************* -#jedis的最大活跃连接数 -redis.pool.maxActive=200 -#jedis最大空闲连接数 -redis.pool.maxIdle=5 -#jedis池没有连接对象返回时,等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。 -#如果超过等待时间,则直接抛出JedisConnectionException -redis.pool.maxWait=-1 -#从池中获取连接的时候,是否进行有效检查 -redis.pool.testOnBorrow=true -#归还连接的时候,是否进行有效检查 -redis.pool.testOnReturn=true diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3077812..04a60a3 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,49 +1,59 @@ #管理kafka地址 -#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 -bootstrap.servers=192.168.6.200:9093,192.168.6.200:9094,192.168.6.200:9095 +#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +bootstrap.servers=192.168.40.151:9092 #zookeeper 地址 -zookeeper.servers=192.168.6.200:2181 -#zookeeper.servers=192.168.40.207:2181 +zookeeper.servers=192.168.40.151:2181 +#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 + +#hbase zookeeper地址 +#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 +hbase.zookeeper.servers=192.168.40.151:2181 + +#hbase tablename +hbase.table.name=subscriber_info #latest/earliest auto.offset.reset=latest #kafka broker下的topic名称 -kafka.topic=SESSION-RECORD-LOG -#kafka.topic=Snowflake-test +kafka.topic=SECURITY-EVENT-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=session-record-log-z +group.id=security-policy-200224 #输出topic -#results.output.topic=SESSION-TEST-COMPLETED-LOG -results.output.topic=SESSION-RECORD-COMPLETED-LOG +results.output.topic=SECURITY-EVENT-COMPLETED-LOG #storm topology workers topology.workers=1 #spout并行度 建议与kafka分区数相同 -spout.parallelism=3 +spout.parallelism=1 #处理补全操作的bolt并行度-worker的倍数 -datacenter.bolt.parallelism=3 +datacenter.bolt.parallelism=1 -#写入kafkad的并行度 -kafka.bolt.parallelism=3 +#写入kafka的并行度10 +kafka.bolt.parallelism=1 #定位库地址 -ip.library=/dat/ - +#ip.library=/home/ceiec/topology/dat/ +#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\ +ip.library=D:\\dat\\ #kafka批量条数 -batch.insert.num=5000 - +batch.insert.num=2000 +#网关的schema位置 +schema.http=http://192.168.40.151:9999/metadata/schema/v1/fields/security_event_log #数据中心(UID) data.center.id.num=15 #tick时钟频率 topology.tick.tuple.freq.secs=5 +#hbase 更新时间 +hbase.tick.tuple.freq.secs=60 + #当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 topology.config.max.spout.pending=150000 diff --git a/src/main/java/cn/ac/iie/bean/SessionRecordLog.java b/src/main/java/cn/ac/iie/bean/SessionRecordLog.java deleted file mode 100644 index baa3f81..0000000 --- a/src/main/java/cn/ac/iie/bean/SessionRecordLog.java +++ /dev/null @@ -1,672 +0,0 @@ -package cn.ac.iie.bean; - -import com.alibaba.fastjson.annotation.JSONField; -import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter; - -/** - * @author qidaijie - */ -public class SessionRecordLog { - private long uid; - private int policy_id; - private long action; - private int start_time; - private int end_time; - private long recv_time; - private String trans_proto; - private String app_proto; - private int addr_type; - private String server_ip; - private String client_ip; - private int server_port; - private int client_port; - private int service; - private int entrance_id; - private int device_id; - private int Link_id; - private String isp; - private int encap_type; - private int direction; - private int stream_dir; - private String cap_ip; - private String addr_list; - private String server_location; - private String client_location; - private String client_asn; - private String server_asn; - private String subscribe_id; - private long con_duration_ms; - private String url; - private String host; - private String domain; - private String category; - private String req_line; - private String res_line; - private String cookie; - private String referer; - private String user_agent; - private String content_len; - private String content_type; - private String set_cookie; - private String req_header; - private String resp_header; - private String req_body_key; - private String req_body; - private String res_body_key; - private String resp_body; - private String version; - private String sni; - private String san; - private String cn; - private int app_id; - private int protocol_id; - private long con_latency_ms; - private int pinningst; - private int intercept_state; - private long ssl_server_side_latency; - private long ssl_client_side_latency; - private String ssl_server_side_version; - private String ssl_client_side_version; - private int ssl_cert_verify; - private String stream_trace_id; - private String ssl_error; - private long c2s_pkt_num; - private long S2c_pkt_num; - private long c2s_byte_num; - private long s2c_byte_num; - private String nas_ip; - private String framed_ip; - private String account; - private int packet_type; - private int has_dup_traffic; - private String stream_error; - - - public SessionRecordLog() { - } - - public long getUid() { - return uid; - } - - public void setUid(long uid) { - this.uid = uid; - } - - public int getPolicy_id() { - return policy_id; - } - - public void setPolicy_id(int policy_id) { - this.policy_id = policy_id; - } - - public long getAction() { - return action; - } - - public void setAction(long action) { - this.action = action; - } - - public int getStart_time() { - return start_time; - } - - public void setStart_time(int start_time) { - this.start_time = start_time; - } - - public int getEnd_time() { - return end_time; - } - - public void setEnd_time(int end_time) { - this.end_time = end_time; - } - - public String getSsl_error() { - return ssl_error; - } - - public void setSsl_error(String ssl_error) { - this.ssl_error = ssl_error; - } - - public String getApp_proto() { - return app_proto; - } - - public void setApp_proto(String app_proto) { - this.app_proto = app_proto; - } - - public long getRecv_time() { - return recv_time; - } - - public void setRecv_time(long recv_time) { - this.recv_time = recv_time; - } - - public String getTrans_proto() { - return trans_proto; - } - - public void setTrans_proto(String trans_proto) { - this.trans_proto = trans_proto; - } - - public int getAddr_type() { - return addr_type; - } - - public void setAddr_type(int addr_type) { - this.addr_type = addr_type; - } - - public String getServer_ip() { - return server_ip; - } - - public void setServer_ip(String server_ip) { - this.server_ip = server_ip; - } - - public String getClient_ip() { - return client_ip; - } - - public void setClient_ip(String client_ip) { - this.client_ip = client_ip; - } - - public int getServer_port() { - return server_port; - } - - public void setServer_port(int server_port) { - this.server_port = server_port; - } - - public int getClient_port() { - return client_port; - } - - public void setClient_port(int client_port) { - this.client_port = client_port; - } - - public int getService() { - return service; - } - - public void setService(int service) { - this.service = service; - } - - public int getEntrance_id() { - return entrance_id; - } - - public void setEntrance_id(int entrance_id) { - this.entrance_id = entrance_id; - } - - public int getDevice_id() { - return device_id; - } - - public void setDevice_id(int device_id) { - this.device_id = device_id; - } - - public int getLink_id() { - return Link_id; - } - - public void setLink_id(int link_id) { - Link_id = link_id; - } - - public String getIsp() { - return isp; - } - - public void setIsp(String isp) { - this.isp = isp; - } - - public int getEncap_type() { - return encap_type; - } - - public void setEncap_type(int encap_type) { - this.encap_type = encap_type; - } - - public int getDirection() { - return direction; - } - - public void setDirection(int direction) { - this.direction = direction; - } - - public int getStream_dir() { - return stream_dir; - } - - public void setStream_dir(int stream_dir) { - this.stream_dir = stream_dir; - } - - public String getCap_ip() { - return cap_ip; - } - - public void setCap_ip(String cap_ip) { - this.cap_ip = cap_ip; - } - - public String getAddr_list() { - return addr_list; - } - - public void setAddr_list(String addr_list) { - this.addr_list = addr_list; - } - - public String getServer_location() { - return server_location; - } - - public void setServer_location(String server_location) { - this.server_location = server_location; - } - - public String getClient_location() { - return client_location; - } - - public void setClient_location(String client_location) { - this.client_location = client_location; - } - - public String getClient_asn() { - return client_asn; - } - - public void setClient_asn(String client_asn) { - this.client_asn = client_asn; - } - - public String getServer_asn() { - return server_asn; - } - - public void setServer_asn(String server_asn) { - this.server_asn = server_asn; - } - - public String getSubscribe_id() { - return subscribe_id; - } - - public void setSubscribe_id(String subscribe_id) { - this.subscribe_id = subscribe_id; - } - - public long getCon_duration_ms() { - return con_duration_ms; - } - - public void setCon_duration_ms(long con_duration_ms) { - this.con_duration_ms = con_duration_ms; - } - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getDomain() { - return domain; - } - - public void setDomain(String domain) { - this.domain = domain; - } - - public String getCategory() { - return category; - } - - public void setCategory(String category) { - this.category = category; - } - - public String getReq_line() { - return req_line; - } - - public void setReq_line(String req_line) { - this.req_line = req_line; - } - - public String getRes_line() { - return res_line; - } - - public void setRes_line(String res_line) { - this.res_line = res_line; - } - - public String getCookie() { - return cookie; - } - - public void setCookie(String cookie) { - this.cookie = cookie; - } - - public String getReferer() { - return referer; - } - - public void setReferer(String referer) { - this.referer = referer; - } - - public String getUser_agent() { - return user_agent; - } - - public void setUser_agent(String user_agent) { - this.user_agent = user_agent; - } - - public String getContent_len() { - return content_len; - } - - public void setContent_len(String content_len) { - this.content_len = content_len; - } - - public String getContent_type() { - return content_type; - } - - public void setContent_type(String content_type) { - this.content_type = content_type; - } - - public String getSet_cookie() { - return set_cookie; - } - - public void setSet_cookie(String set_cookie) { - this.set_cookie = set_cookie; - } - - public String getReq_header() { - return req_header; - } - - public void setReq_header(String req_header) { - this.req_header = req_header; - } - - public String getResp_header() { - return resp_header; - } - - public void setResp_header(String resp_header) { - this.resp_header = resp_header; - } - - public String getReq_body_key() { - return req_body_key; - } - - public void setReq_body_key(String req_body_key) { - this.req_body_key = req_body_key; - } - - public String getReq_body() { - return req_body; - } - - public void setReq_body(String req_body) { - this.req_body = req_body; - } - - public String getRes_body_key() { - return res_body_key; - } - - public void setRes_body_key(String res_body_key) { - this.res_body_key = res_body_key; - } - - public String getResp_body() { - return resp_body; - } - - public void setResp_body(String resp_body) { - this.resp_body = resp_body; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getSni() { - return sni; - } - - public void setSni(String sni) { - this.sni = sni; - } - - public String getSan() { - return san; - } - - public void setSan(String san) { - this.san = san; - } - - public String getCn() { - return cn; - } - - public void setCn(String cn) { - this.cn = cn; - } - - public int getApp_id() { - return app_id; - } - - public void setApp_id(int app_id) { - this.app_id = app_id; - } - - public int getProtocol_id() { - return protocol_id; - } - - public void setProtocol_id(int protocol_id) { - this.protocol_id = protocol_id; - } - - public int getIntercept_state() { - return intercept_state; - } - - public void setIntercept_state(int intercept_state) { - this.intercept_state = intercept_state; - } - - public long getSsl_server_side_latency() { - return ssl_server_side_latency; - } - - public void setSsl_server_side_latency(long ssl_server_side_latency) { - this.ssl_server_side_latency = ssl_server_side_latency; - } - - public long getSsl_client_side_latency() { - return ssl_client_side_latency; - } - - public void setSsl_client_side_latency(long ssl_client_side_latency) { - this.ssl_client_side_latency = ssl_client_side_latency; - } - - public String getSsl_server_side_version() { - return ssl_server_side_version; - } - - public void setSsl_server_side_version(String ssl_server_side_version) { - this.ssl_server_side_version = ssl_server_side_version; - } - - public String getSsl_client_side_version() { - return ssl_client_side_version; - } - - public void setSsl_client_side_version(String ssl_client_side_version) { - this.ssl_client_side_version = ssl_client_side_version; - } - - public int getSsl_cert_verify() { - return ssl_cert_verify; - } - - public void setSsl_cert_verify(int ssl_cert_verify) { - this.ssl_cert_verify = ssl_cert_verify; - } - - public String getStream_trace_id() { - return stream_trace_id; - } - - public void setStream_trace_id(String stream_trace_id) { - this.stream_trace_id = stream_trace_id; - } - - public long getCon_latency_ms() { - return con_latency_ms; - } - - public void setCon_latency_ms(long con_latency_ms) { - this.con_latency_ms = con_latency_ms; - } - - public int getPinningst() { - return pinningst; - } - - public void setPinningst(int pinningst) { - this.pinningst = pinningst; - } - - - public long getC2s_pkt_num() { - return c2s_pkt_num; - } - - public void setC2s_pkt_num(long c2s_pkt_num) { - this.c2s_pkt_num = c2s_pkt_num; - } - - public long getS2c_pkt_num() { - return S2c_pkt_num; - } - - public void setS2c_pkt_num(long s2c_pkt_num) { - S2c_pkt_num = s2c_pkt_num; - } - - public long getC2s_byte_num() { - return c2s_byte_num; - } - - public void setC2s_byte_num(long c2s_byte_num) { - this.c2s_byte_num = c2s_byte_num; - } - - public long getS2c_byte_num() { - return s2c_byte_num; - } - - public void setS2c_byte_num(long s2c_byte_num) { - this.s2c_byte_num = s2c_byte_num; - } - - public String getNas_ip() { - return nas_ip; - } - - public void setNas_ip(String nas_ip) { - this.nas_ip = nas_ip; - } - - public String getFramed_ip() { - return framed_ip; - } - - public void setFramed_ip(String framed_ip) { - this.framed_ip = framed_ip; - } - - public String getAccount() { - return account; - } - - public void setAccount(String account) { - this.account = account; - } - - public int getPacket_type() { - return packet_type; - } - - public void setPacket_type(int packet_type) { - this.packet_type = packet_type; - } - - public int getHas_dup_traffic() { - return has_dup_traffic; - } - - public void setHas_dup_traffic(int has_dup_traffic) { - this.has_dup_traffic = has_dup_traffic; - } - - public String getStream_error() { - return stream_error; - } - - public void setStream_error(String stream_error) { - this.stream_error = stream_error; - } -} diff --git a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java index c19acff..da4786b 100644 --- a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -24,11 +24,11 @@ import java.util.Map; * @date 2018/8/14 */ public class NtcLogSendBolt extends BaseBasicBolt { - private static final long serialVersionUID = 3940515789830317517L; + private static final long serialVersionUID = -3663610927224396615L; private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); private List list; private KafkaLogNtc kafkaLogNtc; - private static long successfulSum = 0; +// private static long successfulSum = 0; @Override @@ -43,11 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt { if (TupleUtils.isTick(tuple)) { if (list.size() != 0) { kafkaLogNtc.sendMessage(list); - successfulSum += list.size(); +// successfulSum += list.size(); list.clear(); } - basicOutputCollector.emit(new Values(successfulSum)); - successfulSum = 0L; +// basicOutputCollector.emit(new Values(successfulSum)); +// successfulSum = 0L; } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { @@ -55,12 +55,12 @@ public class NtcLogSendBolt extends BaseBasicBolt { } if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { kafkaLogNtc.sendMessage(list); - successfulSum += list.size(); +// successfulSum += list.size(); list.clear(); } } } catch (Exception e) { - logger.error("日志发送Kafka过程出现异常 ", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); e.printStackTrace(); } } @@ -74,7 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("suc")); +// outputFieldsDeclarer.declare(new Fields("suc")); } } diff --git a/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java b/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java new file mode 100644 index 0000000..a7c4162 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/collect/CollectCompletedBolt.java @@ -0,0 +1,68 @@ +package cn.ac.iie.bolt.collect; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + + +import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; + + + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ +public class CollectCompletedBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(CollectCompletedBolt.class); + private static final long serialVersionUID = 4682827168247333522L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + HBaseUtils.change(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(dealCommonMessage(message))); + } + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); + return conf; + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java new file mode 100644 index 0000000..a2783ee --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java @@ -0,0 +1,66 @@ +package cn.ac.iie.bolt.proxy; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ + +public class ProxyCompletionBolt extends BaseBasicBolt { + private final static Logger logger = Logger.getLogger(ProxyCompletionBolt.class); + private static final long serialVersionUID = 6097654428594885032L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + HBaseUtils.change(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(dealCommonMessage(message))); + } + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java similarity index 63% rename from src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java rename to src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java index e67b6cf..5293059 100644 --- a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java @@ -1,5 +1,6 @@ -package cn.ac.iie.bolt; +package cn.ac.iie.bolt.radius; +import cn.ac.iie.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; @@ -12,16 +13,17 @@ import org.apache.storm.tuple.Values; import java.util.Map; -import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage; +import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; /** * 通联关系日志补全 * * @author qidaijie */ -public class ConnCompletionBolt extends BaseBasicBolt { - private static final long serialVersionUID = -1059151670138465894L; - private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class); +public class RadiusCompletionBolt extends BaseBasicBolt { + + private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class); + private static final long serialVersionUID = -3657802387129063952L; @Override public void prepare(Map stormConf, TopologyContext context) { @@ -33,13 +35,15 @@ public class ConnCompletionBolt extends BaseBasicBolt { try { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(getJsonMessage(message))); + basicOutputCollector.emit(new Values(dealCommonMessage(message))); } } catch (Exception e) { - logger.error("接收解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); } } + @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("connLog")); diff --git a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java new file mode 100644 index 0000000..8069257 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java @@ -0,0 +1,68 @@ +package cn.ac.iie.bolt.security; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import com.zdjizhi.utils.HBaseUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage; + + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ + +public class SecurityCompletionBolt extends BaseBasicBolt { + + private final static Logger logger = Logger.getLogger(SecurityCompletionBolt.class); + private static final long serialVersionUID = -2380858260054733989L; + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + HBaseUtils.change(); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(dealCommonMessage(message))); + } + } + } catch (Exception e) { + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java index 26e2173..17f03da 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -8,10 +8,9 @@ import cn.ac.iie.utils.system.FlowWriteConfigurations; */ public class FlowWriteConfig { - public static final String LOG_STRING_SPLITTER = "\t"; - public static final String SQL_STRING_SPLITTER = "#"; - public static final String SEGMENTATION = ","; - + public static final int IPV4_TYPE = 1; + public static final int IPV6_TYPE = 2; + public static final String DOMAIN_SPLITTER = "."; /** * System */ @@ -20,6 +19,7 @@ public class FlowWriteConfig { public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); @@ -40,6 +40,8 @@ public class FlowWriteConfig { */ public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name"); public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic"); public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic"); @@ -47,17 +49,9 @@ public class FlowWriteConfig { public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); - - /*** - * Redis + /** + * http */ - public static final String REDIS_IP = "redis.ip"; - public static final String REDIS_PORT = "redis.port"; - public static final String REDIS_TIMEOUT = "redis.timeout"; - public static final String REDIS_POOL_MAXACTIVE = "redis.pool.maxActive"; - public static final String REDIS_POOL_MAXIDLE = "redis.pool.maxIdle"; - public static final String REDIS_POOL_MAXWAIT = "redis.pool.maxWait"; - public static final String REDIS_POOL_TESTONBORROW = "redis.pool.testOnBorrow"; - public static final String REDIS_POOL_TESTONRETURN = "redis.pool.testOnReturn"; + public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java index 0f806fa..d89ec41 100644 --- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -37,6 +37,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { props.put("max.poll.records", 3000); props.put("max.partition.fetch.bytes", 31457280); props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index ede06c3..d4eda58 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -1,9 +1,13 @@ package cn.ac.iie.topology; -import cn.ac.iie.bolt.ConnCompletionBolt; import cn.ac.iie.bolt.NtcLogSendBolt; -import cn.ac.iie.bolt.SummaryBolt; +import cn.ac.iie.bolt.collect.CollectCompletedBolt; +import cn.ac.iie.bolt.radius.RadiusCompletionBolt; +import cn.ac.iie.bolt.security.SecurityCompletionBolt; + +import cn.ac.iie.bolt.proxy.ProxyCompletionBolt; + import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.spout.CustomizedKafkaSpout; import org.apache.log4j.Logger; @@ -58,9 +62,28 @@ public class LogFlowWriteTopology { private void buildTopology() { builder = new TopologyBuilder(); builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); -// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); + + switch (FlowWriteConfig.KAFKA_TOPIC) { + case "PROXY-EVENT-LOG": + builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt"); + break; + case "RADIUS-RECORD-LOG": + builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt"); + break; + case "CONNECTION-RECORD-LOG": + builder.setBolt("CollectCompletedBolt", new CollectCompletedBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("CollectCompletedBolt"); + break; + case "SECURITY-EVENT-LOG": + builder.setBolt("SecurityCompletionBolt", new SecurityCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt"); + break; + + default: + } + } public static void main(String[] args) throws Exception { diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index 9c0bb8f..3c7b318 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -1,19 +1,18 @@ package cn.ac.iie.utils.general; -import cn.ac.iie.bean.SessionRecordLog; + import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.redis.RedisPollUtils; -import cn.ac.iie.utils.system.SnowflakeId; -import cn.ac.iie.utils.zookeeper.DistributedLock; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; +import cn.ac.iie.utils.hbase.HBaseUtils; +import cn.ac.iie.utils.json.JsonParseUtil; import com.alibaba.fastjson.JSONObject; +import com.google.common.net.InternetDomainName; +import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookup; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; -import redis.clients.jedis.Jedis; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.*; + /** @@ -22,102 +21,179 @@ import java.util.regex.Pattern; * @author qidaijie * @create 2018-08-13 15:11 */ + public class TransFormUtils { private static Logger logger = Logger.getLogger(TransFormUtils.class); - private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); + + //在内存中加载反射类用的map + private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); + //反射成一个类 + private static Object mapObject = JsonParseUtil.generateObject(map); + //获取任务列表 + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + //补全工具类 + private static FormatUtils build = new FormatUtils.Builder(false).build(); + //IP定位库工具类 private static IpLookup ipLookup = new IpLookup.Builder(false) .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") .build(); -// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); -// private static SnowflakeId snowflakeId = new SnowflakeId(); - /** * 解析日志,并补全 * - * @param message 原始日志 + * @param message kafka Topic原始日志 * @return 补全后的日志 */ - public static String getJsonMessage(String message) { - SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); - String serverIp = sessionRecordLog.getServer_ip(); - String clientIp = sessionRecordLog.getClient_ip(); + public static String dealCommonMessage(String message) { + + Object object = JSONObject.parseObject(message, mapObject.getClass()); +// System.out.println("补全之前 ===》 "+JSON.toJSONString(object)); try { - sessionRecordLog.setUid(SnowflakeId.generateId()); - sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp)); - sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp)); - sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true)); - sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true)); - sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost())); - sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000); -// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp)); - return JSONObject.toJSONString(sessionRecordLog); + for (String[] strings : jobList) { + //用到的参数的值 + Object name = JsonParseUtil.getValue(object, strings[0]); + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(object, strings[1]); + //匹配操作函数的字段 + String function=strings[2]; + //额外的参数的值 + Object param = null; + if (strings[3] != null){ + param=JsonParseUtil.getValue(object, strings[3]); + } + + + if (function.equals("current_timestamp")) { + JsonParseUtil.setValue(object, strings[1], getCurrentTime()); + } else if (function.equals("snowflake_id")) { + JsonParseUtil.setValue(object, strings[1], build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM)); + } else if (function.equals("geo_ip_detail")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString())); + } else if (function.equals("geo_asn")) { + JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString())); + } else if (function.equals("radius_match")) { + JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString())); + } else if (function.equals("geo_ip_country")) { + JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString())); + } else if (function.equals("decode_of_base64") && param != null){ + JsonParseUtil.setValue(object, strings[1], FormatUtils.base64Str(name.toString(),param.toString())); + } else if (name.equals("http_host") && function.equals("sub_domain")) { + if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { + JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString())); + } + } else if (name.equals("ssl_sni") && strings[2].equals("sub_domain")) { + if (appendTo == null || StringUtil.isBlank(appendTo.toString())) { + JsonParseUtil.setValue(object, strings[1], getTopDomain(name.toString(), null)); + } + + } + + } + + + return JSONObject.toJSONString(object); +// System.out.println("补全之后 ===》 "+JSON.toJSONString(object)); + } catch (Exception e) { - logger.error("日志解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常"); + e.printStackTrace(); return ""; } } + +// @Test +// public void aaa() { +// String sni = "www.baidu.com"; +// System.out.println(getTopDomain(sni, null)); +// System.out.println(getTopDomain(null,sni)); +// +// } + /** - * 有sni通过sni获取域名,有hots根据host获取域名 + * 有sni通过sni获取域名,有host根据host获取域名 * * @param sni sni * @param host host * @return 顶级域名 */ private static String getTopDomain(String sni, String host) { - if (StringUtil.isNotBlank(sni)) { - return getDomain(sni); - } else if (StringUtil.isNotBlank(host)) { - return getDomain(host); + if (StringUtil.isNotBlank(host)) { + return getDomainName(host); + } else if (StringUtil.isNotBlank(sni)) { + return getDomainName(sni); } else { return ""; } } - /** - * 获取用户名 - * - * @param key Sip - * @return SubscribeId - */ - private static String getSubscribeId(String key) { - String sub = ""; - try (Jedis jedis = RedisPollUtils.getJedis()) { - if (jedis != null) { - sub = jedis.get(key); - } - } catch (Exception e) { - logger.error("通过Redis获取用户名出现异常", e); - } - return sub; - } - /** * 根据url截取顶级域名 * - * @param url 网站url + * @param host 网站url * @return 顶级域名 */ - private static String getDomain(String url) { + private static String getDomainName(String host) { + String domain = ""; try { - Matcher matcher = WEB_PATTERN.matcher(url); - if (matcher.find()) { - return matcher.group(); - } + domain = InternetDomainName.from(host).topPrivateDomain().toString(); } catch (Exception e) { - e.printStackTrace(); + logger.error("host解析顶级域名异常: " + e.getMessage()); } - return ""; + return domain; } - public static void main(String[] args) { - String s = ipLookup.countryLookup("192.168.10.207"); - System.out.println(s); + /** + * 生成当前时间戳的操作 + */ + private static int getCurrentTime() { + return (int)(System.currentTimeMillis() / 1000); } -} + /** + * 根据clientIp获取location信息 + * + * @param ip + * @return + */ + private static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + } + + /** + * 根据ip获取asn信息 + * + * @param ip + * @return + */ + private static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip, true); + } + + /** + * 根据ip获取country信息 + * + * @param ip + * @return + */ + private static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + /** + * radius借助hbase补齐 + * + * @param ip + * @return + */ + private static String radiusMatch(String ip) { + return HBaseUtils.getAccount(ip); + } +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java new file mode 100644 index 0000000..5341e9d --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java @@ -0,0 +1,138 @@ +package cn.ac.iie.utils.hbase; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * HBase 工具类 + * + * @author qidaijie + */ + +public class HBaseUtils { + private final static Logger logger = Logger.getLogger(HBaseUtils.class); + private static Map subIdMap = new HashMap<>(333334); + // private static Map subIdMap = new ConcurrentSkipListMap<>(); + private static Connection connection; + private static Long time; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + getAll(); + } catch (IOException e) { + logger.error("获取HBase连接失败"); + e.printStackTrace(); + } + } + + /** + * 更新变量 + */ + public static void change() { + Long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1000, nowTime + 500); + } + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + String key = Bytes.toString(CellUtil.cloneRow(cell)); + String value = Bytes.toString(CellUtil.cloneValue(cell)); + if (subIdMap.containsKey(key)) { + if (!value.equals(subIdMap.get(key))) { + subIdMap.put(key, value); + } + } else { + subIdMap.put(key, value); + } + } + } + Long end = System.currentTimeMillis(); + logger.warn("当前集合长度" + subIdMap.keySet().size()); + logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end); + time = endTime; + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + Long begin = System.currentTimeMillis(); + try { + Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + logger.warn("获取全量后集合长度:" + subIdMap.size()); + logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin)); + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 获取 account + * + * @param clientIp client_ip + * @return account + */ + public static String getAccount(String clientIp) { + return subIdMap.get(clientIp); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..2aa8885 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/http/HttpClientUtil.java @@ -0,0 +1,51 @@ +package cn.ac.iie.utils.http; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + */ +public class HttpClientUtil { + public static String requestByGetMethod(String s) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder = null; + try { + HttpGet get = new HttpGet(s); + CloseableHttpResponse httpResponse = null; + httpResponse = httpClient.execute(get); + try { + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + String line = null; + while ((line = bufferedReader.readLine()) != null) { + entityStringBuilder.append(line); + } + } + } finally { + httpResponse.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + if (httpClient != null) { + httpClient.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + return entityStringBuilder.toString(); + } + +} diff --git a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java index 124344f..c51589b 100644 --- a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java +++ b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java @@ -45,6 +45,22 @@ public class InfluxDbUtils { } } + /** + * 记录对准失败次数-即内存中没有对应的key + * + * @param failure 对准失败量 + */ + public static void sendHBaseFailure(int failure) { + if (failure != 0) { + InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); + Point point1 = Point.measurement("sendHBaseFailure") + .tag("topic", FlowWriteConfig.KAFKA_TOPIC) + .field("failure", failure) + .build(); + client.write("BusinessMonitor", "", point1); + } + } + /** * 获取本机IP * diff --git a/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..9ef09fc --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/json/JsonParseUtil.java @@ -0,0 +1,231 @@ +package cn.ac.iie.utils.json; + +import cn.ac.iie.utils.http.HttpClientUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用fastjson解析json的工具类 + */ +public class JsonParseUtil { + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type + * @return + */ + + public static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "String": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "Integer": + clazz = Integer.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 根据反射生成对象的方法 + * + * @param properties + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Iterator i = keySet.iterator(); i.hasNext(); ) { + String key = (String) i.next(); + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 获取属性值的方法 + * + * @param obj + * @param property + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } + + /** + * 更新属性值的方法 + * + * @param obj + * @param property + * @param value + */ + public static void setValue(Object obj, String property, Object value) { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @param http + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap getMapFromhttp(String http) { + HashMap map = new HashMap<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + String type = JSON.parseObject(field.toString()).get("type").toString(); +// if( +// name.equals("dns_qr") || +// name.equals("dns_opcode") || +// name.equals("ssl_pinningst") || +// name.equals("ssl_intercept_state") || +// name.equals("ssl_cert_verify") +// +// ){ +// type="Integer"; +// } + + + //组合用来生成实体类的map + + map.put(name, getClassName(type)); + + } + + + return map; + } + + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList) + * + * @param http + * @return + */ + public static ArrayList getJobListFromHttp(String http) { + ArrayList list = new ArrayList<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + //解析data + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + String name = JSON.parseObject(field.toString()).get("name").toString(); + + if (doc != null) { + Object format = JSON.parseObject(doc.toString()).get("format"); + + if (format != null) { + + String functions = null; + String appendTo = null; + String params = null; + Object functionsObj = JSON.parseObject(format.toString()).get("functions"); + Object appendToObj = JSON.parseObject(format.toString()).get("appendTo"); + Object paramObj = JSON.parseObject(format.toString()).get("param"); + + if (functionsObj != null) { + functions = functionsObj.toString(); + } + + if (appendToObj != null) { + appendTo = appendToObj.toString(); + } + if (paramObj != null) { + params = paramObj.toString(); + } + + + if (appendTo != null && params == null) { + String[] functionArray = functions.split(","); + String[] appendToArray = appendTo.split(","); + + for (int i = 0; i < functionArray.length; i++) { +// useList.add(name); +// toList.add(appendToArray[i]); +// funcList.add(functionArray[i]); + list.add(new String[]{name, appendToArray[i], functionArray[i],null}); + + } + }else if (appendTo != null && params != null){ + String[] functionArray = functions.split(","); + String[] appendToArray = appendTo.split(","); + String[] paramArray = params.split(","); + + for (int i = 0; i < functionArray.length; i++) { +// useList.add(name); +// toList.add(appendToArray[i]); +// funcList.add(functionArray[i]); + list.add(new String[]{name, appendToArray[i], functionArray[i],paramArray[i]}); + + } + } + + else { +// useList.add(name); +// funcList.add(functions.toString()); +// toList.add(name); + list.add(new String[]{name, name, functions,params}); + } + + } + } + + } + return list; + } + + +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java index 11ae57a..78deae0 100644 --- a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java +++ b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java @@ -59,7 +59,7 @@ public class KafkaLogNtc { } } kafkaProducer.flush(); - logger.warn("Log sent to National Center successfully!!!!!"); + logger.debug("Log sent to National Center successfully!!!!!"); } /** @@ -72,10 +72,10 @@ public class KafkaLogNtc { properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "1"); properties.put("linger.ms", "2"); - properties.put("request.timeout.ms", 60000); + properties.put("request.timeout.ms", 30000); properties.put("batch.size", 262144); properties.put("buffer.memory", 33554432); - properties.put("compression.type", "snappy"); +// properties.put("compression.type", "snappy"); kafkaProducer = new KafkaProducer<>(properties); } diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java deleted file mode 100644 index e7f67d9..0000000 --- a/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java +++ /dev/null @@ -1,79 +0,0 @@ -package cn.ac.iie.utils.redis; - -import cn.ac.iie.common.FlowWriteConfig; -import org.apache.log4j.Logger; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPoolConfig; - -import java.io.IOException; -import java.util.LinkedHashSet; -import java.util.Properties; -import java.util.Set; - -/** - * 预用于对准IP对应的用户名的 Redis连接池 - * - * @author my - * @date 2018-07-04 - */ -public final class RedisClusterUtils { - private static final Logger logger = Logger.getLogger(RedisClusterUtils.class); - private static JedisCluster jedisCluster; - private static Properties props = new Properties(); - - static { - try { - String redisConfigFile = "redis_config.properties"; - props.load(RedisClusterUtils.class.getClassLoader().getResourceAsStream(redisConfigFile)); - } catch (IOException e) { - props = null; - logger.error("加载Redis配置文件失败!", e); - } - } - - /** - * 不允许通过new创建该类的实例 - */ - private RedisClusterUtils() { - } - - /** - * 初始化Redis连接池 - */ - private static JedisCluster getJedisCluster() { - if (jedisCluster == null) { - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE))); - poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE))); - poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT))); - poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN))); - poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW))); - Set nodes = new LinkedHashSet(); - for (String port : props.getProperty(FlowWriteConfig.REDIS_PORT).split(FlowWriteConfig.SEGMENTATION)) { - for (String ip : props.getProperty(FlowWriteConfig.REDIS_IP).split(FlowWriteConfig.SEGMENTATION)) { - nodes.add(new HostAndPort(ip, Integer.parseInt(port))); - } - } - jedisCluster = new JedisCluster(nodes, poolConfig); - } - return jedisCluster; - } - - /** - * 获取用户名 - * - * @param key service_ip - * @return Subscribe_id - */ - public static String get(String key) { - String s = key.split("\\.")[0]; - if (!FlowWriteConfig.CHECK_IP_SCOPE.contains(s)) { - jedisCluster = getJedisCluster(); - return jedisCluster.get(key); - } - return ""; - } - - -} diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java deleted file mode 100644 index 378bef5..0000000 --- a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java +++ /dev/null @@ -1,115 +0,0 @@ -package cn.ac.iie.utils.redis; - -import cn.ac.iie.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.commons.lang3.RandomUtils; -import org.apache.log4j.Logger; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; - -import java.util.Properties; - -/** - * @author qidaijie - */ -public class RedisPollUtils { - private static final Logger logger = Logger.getLogger(RedisPollUtils.class); - private static JedisPool jedisPool = null; - private static Properties props = new Properties(); - - - private RedisPollUtils() { - } - - static { - initialPool(); - - } - - /** - * 初始化Redis连接池 - */ - private static void initialPool() { - try { - //加载连接池配置文件 - props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream("redis_config.properties")); - // 创建jedis池配置实例 - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE))); - poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE))); - poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT))); - poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN))); - poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW))); - // 根据配置实例化jedis池 - jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP), - Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT))); - } catch (Exception e) { - logger.error("Redis连接池初始化错误", e); - } - } - - /** - * 获取Jedis实例 - * - * @return Jedis实例 - */ - public static Jedis getJedis() { - Jedis jedis = null; - try { - if (jedisPool == null) { - initialPool(); - } - jedis = jedisPool.getResource(); - } catch (Exception e) { - logger.error("Redis连接池错误,无法获取连接", e); - } - return jedis; - } - -// /** -// * @param key redis key -// * @return value -// */ -// public static Integer getWorkerId(String key) { -// int workId = 0; -// int maxId = 32; -// try (Jedis jedis = RedisPollUtils.getJedis()) { -// if (jedis != null) { -// String work = jedis.get(key); -// if (StringUtil.isBlank(work)) { -// jedis.set(key, "0"); -// } else { -// workId = Integer.parseInt(work); -// } -// if (workId < maxId) { -// jedis.set(key, String.valueOf(workId + 1)); -// } else { -// workId = 0; -// jedis.set(key, "1"); -// } -// } -// } catch (Exception e) { -// logger.error("通过Redis获取用户名出现异常", e); -// workId = RandomUtils.nextInt(0, 31); -// } -// return workId; -// } - - public static Integer getWorkerId(String key) { - int workId = 0; - try (Jedis jedis = RedisPollUtils.getJedis()) { - if (jedis != null) { - workId = Integer.parseInt(jedis.get(key)); - jedis.set(key, String.valueOf(workId + 2)); - logger.error("\n工作id是:" + workId + "\n"); - } - } catch (Exception e) { - logger.error("通过Redis获取用户名出现异常", e); - workId = RandomUtils.nextInt(0, 31); - } - return workId; - } - - -} diff --git a/src/main/java/cn/ac/iie/utils/system/IpUtils.java b/src/main/java/cn/ac/iie/utils/system/IpUtils.java new file mode 100644 index 0000000..94fefa6 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/IpUtils.java @@ -0,0 +1,56 @@ +package cn.ac.iie.utils.system; + +/** + * IP工具类 + * + * @author qidaijie + */ +public class IpUtils { + /** + * IPV4 正则 + */ + private static final String IPV4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$"; + /** + * IPV6正则 + */ + private static final String IPV6 = "^(([\\da-fA-F]{1,4}):){8}$"; + + + /** + * 判断IP类型 v4 or v6 + * + * @param ip IP + * @return 1:v4 2:v6 3:abnormal + */ + public static int validIPAddress(String ip) { + return String.format("%s.", ip).matches(IPV4) ? 1 : String.format("%s:", ip).matches(IPV6) ? 2 : 3; + } + + /** + * ip字符串转整数 + * ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256. + * + * @param ip IP + * @return ip(int) + */ + public static int ipChangeInt(String ip) { + //分割ip + String[] ipSplit = ip.split("\\."); + int result = 0; + for (int i = 0; i < 4; i++) { + Integer ipSubInteger = Integer.parseInt(ipSplit[i]); + //正则验证不能为负数 + if (ipSubInteger > 255) { + result = 0; + break; + } + result += (ipSubInteger << (24 - i * 8)); + } + return result; + } + + public static void main(String[] args) { + System.out.println(validIPAddress("192.254.254.254")); + System.out.println(ipChangeInt("254.254.254.254")); + } +} diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java deleted file mode 100644 index 5f77996..0000000 --- a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java +++ /dev/null @@ -1,190 +0,0 @@ -package cn.ac.iie.utils.system; - -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.zookeeper.DistributedLock; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; -import org.apache.log4j.Logger; - -/** - * 雪花算法 - * - * @author qidaijie - */ -public class SnowflakeId { - private static Logger logger = Logger.getLogger(SnowflakeId.class); - - // ==============================Fields=========================================== - /** - * 开始时间截 (2018-08-01 00:00:00) max 17years - */ - private final long twepoch = 1564588800000L; - - /** - * 机器id所占的位数 - */ - private final long workerIdBits = 6L; - - /** - * 数据标识id所占的位数 - */ - private final long dataCenterIdBits = 4L; - - /** - * 支持的最大机器id,结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) - */ - private final long maxWorkerId = -1L ^ (-1L << workerIdBits); - - /** - * 支持的最大数据标识id,结果是15 - */ - private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); - - /** - * 序列在id中占的位数 - */ - private final long sequenceBits = 14L; - - /** - * 机器ID向左移12位 - */ - private final long workerIdShift = sequenceBits; - - /** - * 数据标识id向左移17位(14+6) - */ - private final long dataCenterIdShift = sequenceBits + workerIdBits; - - /** - * 时间截向左移22位(4+6+14) - */ - private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; - - /** - * 生成序列的掩码,这里为16383 - */ - private final long sequenceMask = -1L ^ (-1L << sequenceBits); - - /** - * 工作机器ID(0~63) - */ - private long workerId; - - /** - * 数据中心ID(0~15) - */ - private long dataCenterId; - - /** - * 毫秒内序列(0~16383) - */ - private long sequence = 0L; - - /** - * 上次生成ID的时间截 - */ - private long lastTimestamp = -1L; - - - private static SnowflakeId idWorker; - - private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - - static { - idWorker = new SnowflakeId(); - } - - //==============================Constructors===================================== - - /** - * 构造函数 - */ - private SnowflakeId() { - DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - lock.lock(); - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); - if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { - throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); - } - int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM; - if (dataCenterId > maxDataCenterId || dataCenterId < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId)); - } - this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterId; - } - - // ==============================Methods========================================== - - /** - * 获得下一个ID (该方法是线程安全的) - * - * @return SnowflakeId - */ - private synchronized long nextId() { - long timestamp = timeGen(); - - //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 - if (timestamp < lastTimestamp) { - throw new RuntimeException( - String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); - } - - //如果是同一时间生成的,则进行毫秒内序列 - if (lastTimestamp == timestamp) { - sequence = (sequence + 1) & sequenceMask; - //毫秒内序列溢出 - if (sequence == 0) { - //阻塞到下一个毫秒,获得新的时间戳 - timestamp = tilNextMillis(lastTimestamp); - } - } - //时间戳改变,毫秒内序列重置 - else { - sequence = 0L; - } - - //上次生成ID的时间截 - lastTimestamp = timestamp; - - //移位并通过或运算拼到一起组成64位的ID - return ((timestamp - twepoch) << timestampLeftShift) - | (dataCenterId << dataCenterIdShift) - | (workerId << workerIdShift) - | sequence; - } - - /** - * 阻塞到下一个毫秒,直到获得新的时间戳 - * - * @param lastTimestamp 上次生成ID的时间截 - * @return 当前时间戳 - */ - protected long tilNextMillis(long lastTimestamp) { - long timestamp = timeGen(); - while (timestamp <= lastTimestamp) { - timestamp = timeGen(); - } - return timestamp; - } - - /** - * 返回以毫秒为单位的当前时间 - * - * @return 当前时间(毫秒) - */ - protected long timeGen() { - return System.currentTimeMillis(); - } - - - /** - * 静态工具类 - * - * @return - */ - public static Long generateId() { - return idWorker.nextId(); - } - - -} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java index 15f4506..46a7ff2 100644 --- a/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java +++ b/src/main/java/cn/ac/iie/utils/zookeeper/DistributedLock.java @@ -1,7 +1,5 @@ package cn.ac.iie.utils.zookeeper; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.system.SnowflakeId; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -15,9 +13,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; -/** - * @author qidaijie - */ + public class DistributedLock implements Lock, Watcher { private static Logger logger = Logger.getLogger(DistributedLock.class); @@ -83,7 +79,7 @@ public class DistributedLock implements Lock, Watcher { } try { if (this.tryLock()) { - System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + logger.warn(Thread.currentThread().getName() + " " + lockName + " is being locked......"); } else { // 等待锁 waitForLock(waitLock, sessionTimeout); @@ -98,7 +94,7 @@ public class DistributedLock implements Lock, Watcher { try { String splitStr = "_lock_"; if (lockName.contains(splitStr)) { - throw new LockException("锁名有误"); + throw new LockException("locked name is error!!!"); } // 创建临时有序节点 currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], @@ -187,33 +183,4 @@ public class DistributedLock implements Lock, Watcher { super(e); } } - - public static void main(String[] args) { - ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - - Runnable runnable = new Runnable() { - @Override - public void run() { - DistributedLock lock = null; - try { - lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); - lock.lock(); -// System.out.println(SnowflakeId.generateId()); - System.out.println(1); - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - if (lock != null) { - lock.unlock(); - } - } - } - }; - - for (int i = 0; i < 10; i++) { - Thread t = new Thread(runnable); - t.start(); - } - } } \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java deleted file mode 100644 index 639b50c..0000000 --- a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java +++ /dev/null @@ -1,134 +0,0 @@ -package cn.ac.iie.utils.zookeeper; - -import cn.ac.iie.common.FlowWriteConfig; -import org.apache.commons.lang3.RandomUtils; -import org.apache.log4j.Logger; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * @author qidaijie - */ -public class ZookeeperUtils implements Watcher { - private static Logger logger = Logger.getLogger(ZookeeperUtils.class); - - private ZooKeeper zookeeper; - - private static final int SESSION_TIME_OUT = 20000; - - private CountDownLatch countDownLatch = new CountDownLatch(1); - - @Override - public void process(WatchedEvent event) { - if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { - countDownLatch.countDown(); - } - } - - - /** - * 修改节点信息 - * - * @param path 节点路径 - */ - public int modifyNode(String path) { - createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE); - createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); - int workerId; - try { - connectZookeeper(); - Stat stat = zookeeper.exists(path, true); - workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > 55) { - workerId = 0; - zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - } else { - String result = String.valueOf(workerId + 1); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); - } - } - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - workerId = RandomUtils.nextInt(56, 63); - } finally { - closeConn(); - } - logger.error("工作ID是:" + workerId); - return workerId; - } - - /** - * 连接zookeeper - * - */ - private void connectZookeeper() { - try { - zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this); - countDownLatch.await(); - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * 关闭连接 - */ - private void closeConn() { - try { - if (zookeeper != null) { - zookeeper.close(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * 获取节点内容 - * - * @param path 节点路径 - * @return 内容/异常null - */ - private String getNodeDate(String path) { - String result = null; - Stat stat = new Stat(); - try { - byte[] resByte = zookeeper.getData(path, true, stat); - result = new String(resByte); - } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception"); - e.printStackTrace(); - } - return result; - } - - /** - * @param path 节点创建的路径 - * @param date 节点所存储的数据的byte[] - * @param acls 控制权限策略 - */ - private void createNode(String path, byte[] date, List acls) { - try { - connectZookeeper(); - Stat exists = zookeeper.exists(path, true); - if (exists == null) { - zookeeper.create(path, date, acls, CreateMode.PERSISTENT); - } else { - logger.warn("Node already exists!,Don't need to create"); - } - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } finally { - closeConn(); - } - } - -} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties index c355401..17c0e9a 100644 --- a/src/main/java/log4j.properties +++ b/src/main/java/log4j.properties @@ -12,7 +12,7 @@ log4j.appender.file.Threshold=error log4j.appender.file.encoding=UTF-8 log4j.appender.file.Append=true #路径请用相对路径,做好相关测试输出到应用目下 -log4j.appender.file.file=galaxy-name.log +log4j.appender.file.file=storm-topology.log log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.layout=org.apache.log4j.PatternLayout #log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n diff --git a/src/test/java/cn/ac/iie/test/DistributedLock.java b/src/test/java/cn/ac/iie/test/DistributedLock.java index 030bf4a..0d45cb8 100644 --- a/src/test/java/cn/ac/iie/test/DistributedLock.java +++ b/src/test/java/cn/ac/iie/test/DistributedLock.java @@ -10,7 +10,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.zookeeper.ZookeeperUtils; +import com.zdjizhi.utils.ZookeeperUtils; import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; @@ -183,7 +183,7 @@ public class DistributedLock implements Lock, Watcher { try { lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); lock.lock(); - zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC); + zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.ZOOKEEPER_SERVERS); } finally { if (lock != null) { lock.unlock(); diff --git a/src/test/java/cn/ac/iie/test/DomainUtils.java b/src/test/java/cn/ac/iie/test/DomainUtils.java index e7bdf78..a693047 100644 --- a/src/test/java/cn/ac/iie/test/DomainUtils.java +++ b/src/test/java/cn/ac/iie/test/DomainUtils.java @@ -1,37 +1,93 @@ package cn.ac.iie.test; +import com.google.common.net.InternetDomainName; import com.zdjizhi.utils.StringUtil; import javax.xml.bind.SchemaOutputResolver; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.URL; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; public class DomainUtils { - - private static Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); + private final static Set PublicSuffixSet = new HashSet( + Arrays.asList("com|edu|gov|int|mil|net|org|biz|info|pro|name|museum|coop|aero|xxx|idv|top|xyz|xin|vip|win|red|wang|co|mobi|travel|club|post|rec|asia" + .split("\\|"))); public static void main(String[] args) { - System.out.println(getTopDomain("agoo-report.m.taobao.com")); - } +// InternetDomainName.from("foo.co.uk").topPrivateDomain().toString(); + String host = "www.aaa.co.uk"; +// if (host.contains(":")){ +// String s = host.split(":")[0]; +// System.out.println(InternetDomainName.from(s)); +// System.out.println(InternetDomainName.from(s).topPrivateDomain()); +// }else { +// System.out.println(InternetDomainName.from(host).topDomainUnderRegistrySuffix()); + System.out.println(InternetDomainName.from(host).topPrivateDomain()); - private static String getTopDomain(String url) { -// try { - //获取值转换为小写 -// String host = new URL(url).getHost().toLowerCase();//news.hexun.com -// Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); - Matcher matcher = pattern.matcher(url); - if (matcher.find()){ - return matcher.group(); - } -// } catch (MalformedURLException e) { -// e.printStackTrace(); // } - return null; +// System.out.println(InternetDomainName.from("shence.hupu.com").topPrivateDomain()); } + /** + * 获取url的顶级域名 + * // * @param url + * + * @return + */ + public static String getDomainName(String host) { + if (host.endsWith(".") || host.contains("/")) { + host = host.substring(0, host.length() - 1); + } + int index = 0; + String candidate = host; + for (; index >= 0; ) { + index = candidate.indexOf('.'); + String subCandidate = candidate.substring(index + 1); + if (PublicSuffixSet.contains(subCandidate)) { + return candidate; + } + candidate = subCandidate; + } + return candidate; + } + + + public static String getSourceDomain(String host) { + if (host.endsWith(".")) { + host = host.substring(0, host.length() - 1); + } + String[] hostStr = host.split("\\."); + int length = hostStr.length; + if (hostStr.length >= 2) { + if (PublicSuffixSet.contains(hostStr[length - 2])) { + return hostStr[length - 3] + "." + hostStr[length - 2] + "." + hostStr[length - 1]; + } else { + return hostStr[length - 2] + "." + hostStr[length - 1]; + } + } else { + return host; + } + } + + + // 定义正则表达式,域名的根需要自定义,这里不全 + private static final String RE_TOP = "[\\w-]+\\.(com.cn|net.cn|gov.cn|org\\.nz|org.cn|com|net|org|gov|cc|biz|info|cn|co)\\b()*"; + private static Pattern pattern = Pattern.compile(RE_TOP, Pattern.CASE_INSENSITIVE); + + public static String getTopDomain(String url) { + String result = url; + try { + Matcher matcher = pattern.matcher(url); + matcher.find(); + result = matcher.group(); + } catch (Exception e) { + System.out.println("[getTopDomain ERROR]====>"); + e.printStackTrace(); + } + return result; + } } diff --git a/src/test/java/cn/ac/iie/test/TestThread.java b/src/test/java/cn/ac/iie/test/TestThread.java index 5938b8a..89aa655 100644 --- a/src/test/java/cn/ac/iie/test/TestThread.java +++ b/src/test/java/cn/ac/iie/test/TestThread.java @@ -1,3 +1,4 @@ +/* package cn.ac.iie.test; @@ -10,7 +11,7 @@ class RunnableDemo implements Runnable { private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); static { - zookeeperUtils.connectZookeeper("192.168.40.207:2181"); + zookeeperUtils.connectZookeeper(); } @Override @@ -47,3 +48,4 @@ public class TestThread { } } +*/ diff --git a/src/test/java/cn/ac/iie/test/UtilTest.java b/src/test/java/cn/ac/iie/test/UtilTest.java new file mode 100644 index 0000000..c03b75d --- /dev/null +++ b/src/test/java/cn/ac/iie/test/UtilTest.java @@ -0,0 +1,27 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.HashMap; + +public class UtilTest { + + + private static HashMap map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP); + //反射成一个类 + private static Object mapObject = JsonParseUtil.generateObject(map); + //获取任务列表 + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + public static void main(String[] args) { + + + for (String[] strings : jobList) { + System.out.println(strings[0]); + System.out.println(strings[1]); + System.out.println(strings[2]); + } + } +} diff --git a/src/test/java/cn/ac/iie/test/a.json b/src/test/java/cn/ac/iie/test/a.json new file mode 100644 index 0000000..e80ed52 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/a.json @@ -0,0 +1 @@ +{"common_stream_dir":3,"common_address_type":4,"common_client_ip":"82.200.242.225","common_server_ip":"82.200.242.69","common_client_port":59387,"common_server_port":1812,"common_c2s_pkt_num":2,"common_s2c_pkt_num":1,"common_c2s_byte_num":507,"common_s2c_byte_num":151,"common_start_time":1575534194,"common_end_time":1575534195,"common_con_duration_ms":1000,"common_stream_trace_id":0,"common_l4_protocol":"IPv4_UDP","common_address_list":"59387-1812-82.200.242.225-82.200.242.69","radius_packet_type":1,"radius_account":"Kuanysh79143","radius_service_type":2,"radius_acct_session_id":"473332153","radius_framed_ip":"82.200.242.225","common_policy_id":0,"common_service":162,"common_entrance_id":0,"common_direction":0,"common_device_id":0,"common_encapsulation":14,"common_link_id":0,"common_sled_ip":"192.168.40.119","common_schema_type":"RADIUS"} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/bean/Student.java b/src/test/java/cn/ac/iie/test/bean/Student.java new file mode 100644 index 0000000..3383bc0 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/bean/Student.java @@ -0,0 +1,22 @@ +package cn.ac.iie.test.bean; + +public class Student { + private String name; + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java new file mode 100644 index 0000000..26d9d09 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java @@ -0,0 +1,147 @@ +package cn.ac.iie.test.hbase; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.IpUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; + +public class HBaseTest { + private final static Logger logger = Logger.getLogger(HBaseTest.class); +// private static Map subIdMap = new ConcurrentHashMap(13333334); + private static Map subIdMap = new HashMap<>(13333334); + private static Map testMap = new ConcurrentSkipListMap<>(); + private static Connection connection; + private static Long time; + + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 +// configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); +// configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181"); + configuration.set("hbase.zookeeper.quorum", "192.168.40.186:2182"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + } catch (IOException e) { + logger.error("获取HBase连接失败" + e); + e.printStackTrace(); + } + } + + @Test + public void change() { + Long begin = System.currentTimeMillis(); + getAll(); + System.out.println(System.currentTimeMillis() - begin); + } + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { +// int key = Integer.parseInt(Bytes.toString(CellUtil.cloneRow(cell))); + String key = Bytes.toString(CellUtil.cloneRow(cell)); + String value = Bytes.toString(CellUtil.cloneValue(cell)); + if (subIdMap.containsKey(key)) { + if (!value.equals(subIdMap.get(key))) { + subIdMap.put(key, value); + } + } else { + subIdMap.put(key, value); + } + } + } + Long end = System.currentTimeMillis(); + logger.warn("当前集合长度" + subIdMap.keySet().size()); + logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end); + time = endTime; + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + Long begin = System.currentTimeMillis(); + try { + Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { +// subIdMap.put(Integer.valueOf(Bytes.toString(CellUtil.cloneRow(cell))), Bytes.toString(CellUtil.cloneValue(cell))); +// subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + testMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } +// logger.warn("获取全量后集合长度:" + subIdMap.size()); + logger.warn("获取全量后集合长度:" + testMap.size()); + logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin)); + scanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 获取 account + * + * @param clientIp client_ip + * @return account + */ + public static String getAccount(String clientIp) { + int ipType = cn.ac.iie.utils.system.IpUtils.validIPAddress(clientIp); + String account = ""; + if (ipType == FlowWriteConfig.IPV4_TYPE) { + account = subIdMap.get(IpUtils.ipChangeInt(clientIp)); + } else if (ipType == FlowWriteConfig.IPV6_TYPE) { + account = subIdMap.get(clientIp); + } + return account; + } +} diff --git a/src/test/java/cn/ac/iie/test/hbase/IpUtils.java b/src/test/java/cn/ac/iie/test/hbase/IpUtils.java new file mode 100644 index 0000000..1cfebe0 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/hbase/IpUtils.java @@ -0,0 +1,63 @@ +package cn.ac.iie.test.hbase; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.general.EncryptionUtils; +import org.apache.log4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IpUtils { + private static Logger logger = Logger.getLogger(IpUtils.class); + + public static void main(String[] args) { + System.out.println(System.currentTimeMillis()); + System.out.println(System.currentTimeMillis() - 60000); + } + + /** + * ip字符串转整数 + * ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256. + * + * @param ip + * @return + */ + public static int ip2Int(String ip) { + String[] ipStrs = ip.split("\\.");//分割ip + int result = 0; + for (int i = 0; i < 4; i++) { + Integer ipSubInteger = Integer.parseInt(ipStrs[i]); + if (ipSubInteger > 255) {//正则验证不能为负数 + result = 0; + break; + } + result += (ipSubInteger << (24 - i * 8)); + } + return result; + } + + + /** + * 整数转ip + * + * @param ip + * @return + */ + public static String int2Ip(int ip) { + StringBuilder builder = new StringBuilder(String.valueOf(ip >>> 24)); + builder.append("."); + builder.append(String.valueOf((ip & 0X00FFFFFF) >>> 16)); + builder.append("."); + builder.append(String.valueOf((ip & 0X0000FFFF) >>> 8)); + builder.append("."); + builder.append(String.valueOf(ip & 0X000000FF)); + return builder.toString(); + } + + + public static int validIPAddress(String ip) { + String ipv4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$"; + //8个1-4位+: + String ipv6 = "^(([\\da-fA-F]{1,4}):){8}$"; + return String.format("%s.", ip).matches(ipv4) ? 1 : String.format("%s:", ip).matches(ipv6) ? 2 : 3; + } +} diff --git a/src/test/java/cn/ac/iie/test/test.java b/src/test/java/cn/ac/iie/test/test.java index 9d2d332..d4ba3c2 100644 --- a/src/test/java/cn/ac/iie/test/test.java +++ b/src/test/java/cn/ac/iie/test/test.java @@ -1,46 +1,53 @@ package cn.ac.iie.test; -import cn.ac.iie.bean.SessionRecordLog; import cn.ac.iie.common.FlowWriteConfig; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.IpLookup; +import com.google.common.net.InternetDomainName; +import com.zdjizhi.utils.*; +import org.apache.log4j.Logger; +import org.apache.storm.shade.com.google.common.collect.Lists; import org.junit.Test; -import javax.servlet.http.HttpServletRequest; -import java.math.BigInteger; -import java.util.Arrays; +import java.io.File; +import java.util.ArrayList; public class test { - public static void main(String[] args) { - String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\"}"; - SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); - System.out.println(sessionRecordLog.getStream_trace_id()); + private static Logger logger = Logger.getLogger(test.class); - String message2 = "{\"streamtraceid\":\"JSON\"}"; - SessionRecordLog sessionRecordLog2 = JSONObject.parseObject(message2, SessionRecordLog.class); - System.out.println(sessionRecordLog2.getStream_trace_id()); - - JSONObject jsonObject = JSON.parseObject(message); - System.out.println("\n" + Arrays.toString(jsonObject.keySet().toArray())); - - HttpServletRequest request = null; - if (request != null) { - String contextPath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + request.getContextPath(); - } - - System.out.println(System.currentTimeMillis() / 1000); - } + static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); +// @Test - public void test2() { -// String minTimeStampStr = "00000000000000000000000000000000000000000"; - String minTimeStampStr = "000000000000000000000000000000000000000"; - long minTimeStamp = new BigInteger(minTimeStampStr, 2).longValue(); -// String maxTimeStampStr = "11111111111111111111111111111111111111111"; - String maxTimeStampStr = "111111111111111111111111111111111111111"; - long maxTimeStamp = new BigInteger(maxTimeStampStr, 2).longValue(); - long oneYearMills = 1L * 1000 * 60 * 60 * 24 * 365; - System.out.println((maxTimeStamp - minTimeStamp) / oneYearMills); + public void test() throws InterruptedException { + File file = new File("D:\\123\\test.txt"); + String zookeeperIp ="192.168.40.224:2181"; + String kafkaTopic ="CONNECTION-RECORD-LOG"; + + System.out.println(zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp)); + System.out.println(zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp)); + +// ArrayList list = Lists.newArrayList(); +// for (int i = 1; i <= 500; i++) { +// ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1"); +// if (lock.lock()) { +// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + kafkaTopic, zookeeperIp); +// Long generateId = SnowflakeId.generateId(tmpWorkerId, 12); +// System.err.println(generateId); +// list.add(generateId); +// lock.unlock(); +// } +// if(i%5==0) { +//// fileWrite(list, file); +// Thread.sleep(1000); +// } +// } +// System.err.println("第2个进程结束"); + +// FormatUtils build = new FormatUtils.Builder(false).build(); +// long snowflakeId = build.getSnowflakeId("192.168.40.224:2181", "CONNECTION-RECORD-LOG", 12); +// System.err.println(snowflakeId); + + } } diff --git a/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java index f442ebe..1423f98 100644 --- a/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java +++ b/src/test/java/cn/ac/iie/test/zookeeper/DistributedLock.java @@ -1,26 +1,182 @@ package cn.ac.iie.test.zookeeper; +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; -public interface DistributedLock { - /** - * 获取锁,如果没有得到就等待 - */ - public void acquire() throws Exception; +public class DistributedLock implements Lock, Watcher { + private ZooKeeper zk; + private String root = "/locks";//根 + private String lockName;//竞争资源的标志 + private String waitNode;//等待前一个锁 + private String myZnode;//当前锁 + private CountDownLatch latch;//计数器 + private CountDownLatch connectedSignal = new CountDownLatch(1); + private int sessionTimeout = 2000; + public static void main(String[] args) { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + lock.lock(); + System.out.println(1); + if (lock!=null){ + lock.unlock(); + } + } /** - * 获取锁,直到超时 + * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 * - * @param unit time参数的单位 - * @throws Exception - * @return是否获取到锁 + * @param config 192.168.1.127:2181 + * @param lockName 竞争资源标志,lockName中不能包含单词_lock_ */ - public boolean acquire(long time, TimeUnit unit) throws Exception; + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + // 创建一个与服务器的连接 + try { + zk = new ZooKeeper(config, sessionTimeout, this); + connectedSignal.await(); + Stat stat = zk.exists(root, false);//此去不执行 Watcher + if (stat == null) { + // 创建根节点 + zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException e) { + throw new LockException(e); + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + } /** - * 释放锁 - * - * @throws Exception + * zookeeper节点的监视器 */ - public void release() throws Exception; -} + public void process(WatchedEvent event) { + //建立连接用 + if (event.getState() == Event.KeeperState.SyncConnected) { + connectedSignal.countDown(); + return; + } + //其他线程放弃锁的标志 + if (this.latch != null) { + this.latch.countDown(); + } + } + + public void lock() { + try { + if (this.tryLock()) { + System.out.println("Thread " + Thread.currentThread().getId() + " " + myZnode + " get lock true"); + return; + } else { + waitForLock(waitNode, sessionTimeout);//等待锁 + } + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + } + + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) + throw new LockException("lockName can not contains \\u000B"); + //创建临时子节点 + myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + System.out.println(myZnode + " is created "); + //取出所有子节点 + List subNodes = zk.getChildren(root, false); + //取出所有lockName的锁 + List lockObjNodes = new ArrayList(); + for (String node : subNodes) { + String _node = node.split(splitStr)[0]; + if (_node.equals(lockName)) { + lockObjNodes.add(node); + } + } + Collections.sort(lockObjNodes); + + if (myZnode.equals(root + "/" + lockObjNodes.get(0))) { + //如果是最小的节点,则表示取得锁 + System.out.println(myZnode + "==" + lockObjNodes.get(0)); + return true; + } + //如果不是最小的节点,找到比自己小1的节点 + String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); + waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点 + } catch (KeeperException e) { + throw new LockException(e); + } catch (InterruptedException e) { + throw new LockException(e); + } + return false; + } + + public boolean tryLock(long time, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitNode, time); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { + Stat stat = zk.exists(root + "/" + lower, true);//同时注册监听。 + //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 + if (stat != null) { + System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); + this.latch = new CountDownLatch(1); + this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁 + this.latch = null; + } + return true; + } + + public void unlock() { + try { + System.out.println("unlock " + myZnode); + zk.delete(myZnode, -1); + myZnode = null; + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } + } + + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + public Condition newCondition() { + return null; + } + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } +} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java b/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java new file mode 100644 index 0000000..3730073 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/zookeeper/RandomTest.java @@ -0,0 +1,8 @@ +package cn.ac.iie.test.zookeeper; + +public class RandomTest { + public static void main(String[] args) { + + + } +}