From 2c307deba43767ac384c014993d00cac776a1bde Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 5 Aug 2019 17:50:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=A1=A5=E5=85=A8=E7=A8=8B?= =?UTF-8?q?=E5=BA=8F=E5=88=9D=E5=A7=8B=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 186 +++++ properties/redis_config.properties | 19 + properties/service_flow_config.properties | 66 ++ .../java/cn/ac/iie/bean/SessionRecordLog.java | 653 ++++++++++++++++++ .../cn/ac/iie/bolt/ConnCompletionBolt.java | 48 ++ .../java/cn/ac/iie/bolt/NtcLogSendBolt.java | 71 ++ .../cn/ac/iie/common/FlowWriteConfig.java | 63 ++ .../cn/ac/iie/spout/CustomizedKafkaSpout.java | 80 +++ .../ac/iie/topology/LogFlowWriteTopology.java | 86 +++ .../java/cn/ac/iie/topology/StormRunner.java | 35 + .../ac/iie/utils/general/EncryptionUtils.java | 68 ++ .../ac/iie/utils/general/TransFormUtils.java | 115 +++ .../ac/iie/utils/influxdb/InfluxDbUtils.java | 48 ++ .../cn/ac/iie/utils/kafka/KafkaLogNtc.java | 83 +++ .../ac/iie/utils/redis/RedisClusterUtils.java | 79 +++ .../cn/ac/iie/utils/redis/RedisPollUtils.java | 115 +++ .../utils/system/FlowWriteConfigurations.java | 65 ++ .../cn/ac/iie/utils/system/SnowflakeId.java | 202 ++++++ .../cn/ac/iie/utils/system/TupleUtils.java | 23 + .../iie/utils/zookeeper/ZookeeperUtils.java | 123 ++++ src/main/java/log4j.properties | 23 + src/test/java/cn/ac/iie/test/DomainUtils.java | 37 + .../cn/ac/iie/test/SnowflakeIdWorker.java | 182 +++++ src/test/java/cn/ac/iie/test/TestThread.java | 49 ++ src/test/java/cn/ac/iie/test/URLUtil.java | 90 +++ .../java/cn/ac/iie/test/ZookeeperTest.java | 126 ++++ src/test/java/cn/ac/iie/test/a.xml | 12 + .../java/cn/ac/iie/test/influxQueryTest.java | 52 ++ src/test/java/cn/ac/iie/test/test.java | 46 ++ 29 files changed, 2845 insertions(+) create mode 100644 pom.xml create mode 100644 properties/redis_config.properties create mode 100644 properties/service_flow_config.properties create mode 100644 src/main/java/cn/ac/iie/bean/SessionRecordLog.java create mode 100644 src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java create mode 100644 src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java create mode 100644 src/main/java/cn/ac/iie/common/FlowWriteConfig.java create mode 100644 src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java create mode 100644 src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java create mode 100644 src/main/java/cn/ac/iie/topology/StormRunner.java create mode 100644 src/main/java/cn/ac/iie/utils/general/EncryptionUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/general/TransFormUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java create mode 100644 src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java create mode 100644 src/main/java/cn/ac/iie/utils/system/SnowflakeId.java create mode 100644 src/main/java/cn/ac/iie/utils/system/TupleUtils.java create mode 100644 src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java create mode 100644 src/main/java/log4j.properties create mode 100644 src/test/java/cn/ac/iie/test/DomainUtils.java create mode 100644 src/test/java/cn/ac/iie/test/SnowflakeIdWorker.java create mode 100644 src/test/java/cn/ac/iie/test/TestThread.java create mode 100644 src/test/java/cn/ac/iie/test/URLUtil.java create mode 100644 src/test/java/cn/ac/iie/test/ZookeeperTest.java create mode 100644 src/test/java/cn/ac/iie/test/a.xml create mode 100644 src/test/java/cn/ac/iie/test/influxQueryTest.java create mode 100644 src/test/java/cn/ac/iie/test/test.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2e03234 --- /dev/null +++ b/pom.xml @@ -0,0 +1,186 @@ + + 4.0.0 + + cn.ac.iie + log-stream-completion + 0.0.1-SNAPSHOT + jar + + log-stream-completion + http://maven.apache.org + + + + nexus + Team Nexus Repository + http://192.168.10.125:8099/content/groups/public + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + cn.ac.iie.topology.LogFlowWriteTopology + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + + false + + + src/main/java + + log4j.properties + + false + + + + + + UTF-8 + 1.0.0 + 1.0.2 + + + + + + org.apache.kafka + kafka_2.11 + 1.0.0 + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.storm + storm-core + ${storm.version} + provided + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + org.influxdb + influxdb-java + 2.1 + + + + org.apache.storm + storm-kafka + ${storm.version} + + + + redis.clients + jedis + 2.8.1 + + + + junit + junit + 4.12 + test + + + + com.alibaba + fastjson + 1.2.47 + + + + com.zdjizhi + galaxy + 1.0.1 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + org.apache.zookeeper + zookeeper + 3.4.9 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + + diff --git a/properties/redis_config.properties b/properties/redis_config.properties new file mode 100644 index 0000000..f99d396 --- /dev/null +++ b/properties/redis_config.properties @@ -0,0 +1,19 @@ +#*****************jedis连接参数设置********************* +#redis服务器ip +redis.ip=192.168.40.123 +#redis服务器端口号 +redis.port=6379 +#与服务器建立连接的超时时间 +redis.timeout=3000 +#************************jedis池参数设置******************* +#jedis的最大活跃连接数 +redis.pool.maxActive=200 +#jedis最大空闲连接数 +redis.pool.maxIdle=5 +#jedis池没有连接对象返回时,等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。 +#如果超过等待时间,则直接抛出JedisConnectionException +redis.pool.maxWait=-1 +#从池中获取连接的时候,是否进行有效检查 +redis.pool.testOnBorrow=true +#归还连接的时候,是否进行有效检查 +redis.pool.testOnReturn=true diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..8bfb59b --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,66 @@ +#管理kafka地址 +#bootstrap.servers=10.4.35.7:9092,10.4.35.8:9092,10.4.35.9:9092 +bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 + +#zookeeper 地址 +zookeeper.servers=192.168.40.207:2181 + +#latest/earliest +auto.offset.reset=latest + +#kafka broker下的topic名称 +kafka.topic=SESSION-TEST-LOG + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=session-record-log-g + +#输出topic +results.output.topic=SESSION-TEST-COMPLETED-LOG + +#storm topology workers +topology.workers=6 + +#spout并行度 建议与kafka分区数相同 +spout.parallelism=12 + +#处理补全操作的bolt并行度-worker的倍数 +datacenter.bolt.parallelism=30 + +#写入kafkad的并行度 +kafka.bolt.parallelism=30 + +#定位库地址 +ip.library=/home/ceiec/topology/dat/ + +#kafka批量条数 +batch.insert.num=5000 + +#数据中心(UID) +data.center.id.num=15 + +#tick时钟频率 +topology.tick.tuple.freq.secs=5 + +#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 +topology.config.max.spout.pending=150000 + +#ack设置 1启动ack 0不启动ack +topology.num.acks=0 + +#spout接收睡眠时间 +topology.spout.sleep.time=1 + +#用于过滤对准用户名 +check.ip.scope=10,100,192 + +#允许发送kafka最大失败数 +max.failure.num=20 + +#influx地址 +influx.ip=http://10.0.5.19:8086 + +#influx用户名 +influx.username=admin + +#influx密码 +influx.password=admin \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/bean/SessionRecordLog.java b/src/main/java/cn/ac/iie/bean/SessionRecordLog.java new file mode 100644 index 0000000..5385883 --- /dev/null +++ b/src/main/java/cn/ac/iie/bean/SessionRecordLog.java @@ -0,0 +1,653 @@ +package cn.ac.iie.bean; + +import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter; + +/** + * @author qidaijie + */ +public class SessionRecordLog { + private long uid; + private int policy_id; + private long action; + private int start_time; + private int end_time; + private long recv_time; + private String trans_proto; + private String app_proto; + private int addr_type; + private String server_ip; + private String client_ip; + private int server_port; + private int client_port; + private int service; + private int entrance_id; + private int device_id; + private int Link_id; + private String isp; + private int encap_type; + private int direction; + private int stream_dir; + private String cap_ip; + private String addr_list; + private String server_location; + private String client_location; + private String client_asn; + private String server_asn; + private String subscribe_id; + private long con_duration_ms; + private String url; + private String host; + private String domain; + private String category; + private String req_line; + private String res_line; + private String cookie; + private String referer; + private String user_agent; + private String content_len; + private String content_type; + private String set_cookie; + private String req_header; + private String resp_header; + private String req_body_key; + private String req_body; + private String res_body_key; + private String resp_body; + private String version; + private String sni; + private String san; + private String cn; + private int app_id; + private int protocol_id; + private long con_latency_ms; + private int pinningst; + private int intercept_state; + private long ssl_server_side_latency; + private long ssl_client_side_latency; + private String ssl_server_side_version; + private String ssl_client_side_version; + private int ssl_cert_verify; + private String stream_trace_id; + private String ssl_error; + private long c2s_pkt_num; + private long S2c_pkt_num; + private long c2s_byte_num; + private long s2c_byte_num; + private String nas_ip; + private String framed_ip; + private String account; + private int packet_type; + + public SessionRecordLog() { + } + + public long getUid() { + return uid; + } + + public void setUid(long uid) { + this.uid = uid; + } + + public int getPolicy_id() { + return policy_id; + } + + public void setPolicy_id(int policy_id) { + this.policy_id = policy_id; + } + + public long getAction() { + return action; + } + + public void setAction(long action) { + this.action = action; + } + + public int getStart_time() { + return start_time; + } + + public void setStart_time(int start_time) { + this.start_time = start_time; + } + + public int getEnd_time() { + return end_time; + } + + public void setEnd_time(int end_time) { + this.end_time = end_time; + } + + public String getSsl_error() { + return ssl_error; + } + + public void setSsl_error(String ssl_error) { + this.ssl_error = ssl_error; + } + + public String getApp_proto() { + return app_proto; + } + + public void setApp_proto(String app_proto) { + this.app_proto = app_proto; + } + + public long getRecv_time() { + return recv_time; + } + + public void setRecv_time(long recv_time) { + this.recv_time = recv_time; + } + + public String getTrans_proto() { + return trans_proto; + } + + public void setTrans_proto(String trans_proto) { + this.trans_proto = trans_proto; + } + + public int getAddr_type() { + return addr_type; + } + + public void setAddr_type(int addr_type) { + this.addr_type = addr_type; + } + + public String getServer_ip() { + return server_ip; + } + + public void setServer_ip(String server_ip) { + this.server_ip = server_ip; + } + + public String getClient_ip() { + return client_ip; + } + + public void setClient_ip(String client_ip) { + this.client_ip = client_ip; + } + + public int getServer_port() { + return server_port; + } + + public void setServer_port(int server_port) { + this.server_port = server_port; + } + + public int getClient_port() { + return client_port; + } + + public void setClient_port(int client_port) { + this.client_port = client_port; + } + + public int getService() { + return service; + } + + public void setService(int service) { + this.service = service; + } + + public int getEntrance_id() { + return entrance_id; + } + + public void setEntrance_id(int entrance_id) { + this.entrance_id = entrance_id; + } + + public int getDevice_id() { + return device_id; + } + + public void setDevice_id(int device_id) { + this.device_id = device_id; + } + + public int getLink_id() { + return Link_id; + } + + public void setLink_id(int link_id) { + Link_id = link_id; + } + + public String getIsp() { + return isp; + } + + public void setIsp(String isp) { + this.isp = isp; + } + + public int getEncap_type() { + return encap_type; + } + + public void setEncap_type(int encap_type) { + this.encap_type = encap_type; + } + + public int getDirection() { + return direction; + } + + public void setDirection(int direction) { + this.direction = direction; + } + + public int getStream_dir() { + return stream_dir; + } + + public void setStream_dir(int stream_dir) { + this.stream_dir = stream_dir; + } + + public String getCap_ip() { + return cap_ip; + } + + public void setCap_ip(String cap_ip) { + this.cap_ip = cap_ip; + } + + public String getAddr_list() { + return addr_list; + } + + public void setAddr_list(String addr_list) { + this.addr_list = addr_list; + } + + public String getServer_location() { + return server_location; + } + + public void setServer_location(String server_location) { + this.server_location = server_location; + } + + public String getClient_location() { + return client_location; + } + + public void setClient_location(String client_location) { + this.client_location = client_location; + } + + public String getClient_asn() { + return client_asn; + } + + public void setClient_asn(String client_asn) { + this.client_asn = client_asn; + } + + public String getServer_asn() { + return server_asn; + } + + public void setServer_asn(String server_asn) { + this.server_asn = server_asn; + } + + public String getSubscribe_id() { + return subscribe_id; + } + + public void setSubscribe_id(String subscribe_id) { + this.subscribe_id = subscribe_id; + } + + public long getCon_duration_ms() { + return con_duration_ms; + } + + public void setCon_duration_ms(long con_duration_ms) { + this.con_duration_ms = con_duration_ms; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + public String getReq_line() { + return req_line; + } + + public void setReq_line(String req_line) { + this.req_line = req_line; + } + + public String getRes_line() { + return res_line; + } + + public void setRes_line(String res_line) { + this.res_line = res_line; + } + + public String getCookie() { + return cookie; + } + + public void setCookie(String cookie) { + this.cookie = cookie; + } + + public String getReferer() { + return referer; + } + + public void setReferer(String referer) { + this.referer = referer; + } + + public String getUser_agent() { + return user_agent; + } + + public void setUser_agent(String user_agent) { + this.user_agent = user_agent; + } + + public String getContent_len() { + return content_len; + } + + public void setContent_len(String content_len) { + this.content_len = content_len; + } + + public String getContent_type() { + return content_type; + } + + public void setContent_type(String content_type) { + this.content_type = content_type; + } + + public String getSet_cookie() { + return set_cookie; + } + + public void setSet_cookie(String set_cookie) { + this.set_cookie = set_cookie; + } + + public String getReq_header() { + return req_header; + } + + public void setReq_header(String req_header) { + this.req_header = req_header; + } + + public String getResp_header() { + return resp_header; + } + + public void setResp_header(String resp_header) { + this.resp_header = resp_header; + } + + public String getReq_body_key() { + return req_body_key; + } + + public void setReq_body_key(String req_body_key) { + this.req_body_key = req_body_key; + } + + public String getReq_body() { + return req_body; + } + + public void setReq_body(String req_body) { + this.req_body = req_body; + } + + public String getRes_body_key() { + return res_body_key; + } + + public void setRes_body_key(String res_body_key) { + this.res_body_key = res_body_key; + } + + public String getResp_body() { + return resp_body; + } + + public void setResp_body(String resp_body) { + this.resp_body = resp_body; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getSni() { + return sni; + } + + public void setSni(String sni) { + this.sni = sni; + } + + public String getSan() { + return san; + } + + public void setSan(String san) { + this.san = san; + } + + public String getCn() { + return cn; + } + + public void setCn(String cn) { + this.cn = cn; + } + + public int getApp_id() { + return app_id; + } + + public void setApp_id(int app_id) { + this.app_id = app_id; + } + + public int getProtocol_id() { + return protocol_id; + } + + public void setProtocol_id(int protocol_id) { + this.protocol_id = protocol_id; + } + + public int getIntercept_state() { + return intercept_state; + } + + public void setIntercept_state(int intercept_state) { + this.intercept_state = intercept_state; + } + + public long getSsl_server_side_latency() { + return ssl_server_side_latency; + } + + public void setSsl_server_side_latency(long ssl_server_side_latency) { + this.ssl_server_side_latency = ssl_server_side_latency; + } + + public long getSsl_client_side_latency() { + return ssl_client_side_latency; + } + + public void setSsl_client_side_latency(long ssl_client_side_latency) { + this.ssl_client_side_latency = ssl_client_side_latency; + } + + public String getSsl_server_side_version() { + return ssl_server_side_version; + } + + public void setSsl_server_side_version(String ssl_server_side_version) { + this.ssl_server_side_version = ssl_server_side_version; + } + + public String getSsl_client_side_version() { + return ssl_client_side_version; + } + + public void setSsl_client_side_version(String ssl_client_side_version) { + this.ssl_client_side_version = ssl_client_side_version; + } + + public int getSsl_cert_verify() { + return ssl_cert_verify; + } + + public void setSsl_cert_verify(int ssl_cert_verify) { + this.ssl_cert_verify = ssl_cert_verify; + } + + public String getStream_trace_id() { + return stream_trace_id; + } + + public void setStream_trace_id(String stream_trace_id) { + this.stream_trace_id = stream_trace_id; + } + + public long getCon_latency_ms() { + return con_latency_ms; + } + + public void setCon_latency_ms(long con_latency_ms) { + this.con_latency_ms = con_latency_ms; + } + + public int getPinningst() { + return pinningst; + } + + public void setPinningst(int pinningst) { + this.pinningst = pinningst; + } + + + public long getC2s_pkt_num() { + return c2s_pkt_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public long getS2c_pkt_num() { + return S2c_pkt_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + S2c_pkt_num = s2c_pkt_num; + } + + public long getC2s_byte_num() { + return c2s_byte_num; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public long getS2c_byte_num() { + return s2c_byte_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public String getNas_ip() { + return nas_ip; + } + + public void setNas_ip(String nas_ip) { + this.nas_ip = nas_ip; + } + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public int getPacket_type() { + return packet_type; + } + + public void setPacket_type(int packet_type) { + this.packet_type = packet_type; + } +} diff --git a/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java new file mode 100644 index 0000000..e67b6cf --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/ConnCompletionBolt.java @@ -0,0 +1,48 @@ +package cn.ac.iie.bolt; + +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage; + +/** + * 通联关系日志补全 + * + * @author qidaijie + */ +public class ConnCompletionBolt extends BaseBasicBolt { + private static final long serialVersionUID = -1059151670138465894L; + private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class); + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(getJsonMessage(message))); + } + } catch (Exception e) { + logger.error("接收解析过程出现异常", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("connLog")); + } + +} diff --git a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java new file mode 100644 index 0000000..dcf62a1 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -0,0 +1,71 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.system.TupleUtils; +import cn.ac.iie.utils.kafka.KafkaLogNtc; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @date 2018/8/14 + */ +public class NtcLogSendBolt extends BaseBasicBolt { + private static final long serialVersionUID = 3940515789830317517L; + private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); + private List list; + private KafkaLogNtc kafkaLogNtc; + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + list = new LinkedList<>(); + kafkaLogNtc = KafkaLogNtc.getInstance(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + if (TupleUtils.isTick(tuple)) { + if (list.size() != 0) { + kafkaLogNtc.sendMessage(list); + list.clear(); + } + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + list.add(message); + } + if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { + kafkaLogNtc.sendMessage(list); + list.clear(); + } + } + } catch (Exception e) { + logger.error("日志发送Kafka过程出现异常 ", e); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap<>(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + } + +} diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java new file mode 100644 index 0000000..26e2173 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -0,0 +1,63 @@ +package cn.ac.iie.common; + + +import cn.ac.iie.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + public static final String LOG_STRING_SPLITTER = "\t"; + public static final String SQL_STRING_SPLITTER = "#"; + public static final String SEGMENTATION = ","; + + /** + * System + */ + public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism"); + public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); + public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); + public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); + public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final String CHECK_IP_SCOPE = FlowWriteConfigurations.getStringProperty(0, "check.ip.scope"); + public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); + + /** + * influxDB + */ + public static final String INFLUX_IP = FlowWriteConfigurations.getStringProperty(0, "influx.ip"); + public static final String INFLUX_USERNAME = FlowWriteConfigurations.getStringProperty(0, "influx.username"); + public static final String INFLUX_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "influx.password"); + + /** + * kafka + */ + public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic"); + public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); + + public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); + + + /*** + * Redis + */ + public static final String REDIS_IP = "redis.ip"; + public static final String REDIS_PORT = "redis.port"; + public static final String REDIS_TIMEOUT = "redis.timeout"; + public static final String REDIS_POOL_MAXACTIVE = "redis.pool.maxActive"; + public static final String REDIS_POOL_MAXIDLE = "redis.pool.maxIdle"; + public static final String REDIS_POOL_MAXWAIT = "redis.pool.maxWait"; + public static final String REDIS_POOL_TESTONBORROW = "redis.pool.testOnBorrow"; + public static final String REDIS_POOL_TESTONRETURN = "redis.pool.testOnReturn"; + +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java new file mode 100644 index 0000000..0f806fa --- /dev/null +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -0,0 +1,80 @@ +package cn.ac.iie.spout; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.log4j.Logger; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * kafkaSpout + * + * @author Administrator + */ +public class CustomizedKafkaSpout extends BaseRichSpout { + private static final long serialVersionUID = -3363788553406229592L; + private KafkaConsumer consumer; + private SpoutOutputCollector collector = null; + private TopologyContext context = null; + private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class); + + + private static Properties createConsumerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + props.put("group.id", FlowWriteConfig.GROUP_ID); + props.put("session.timeout.ms", "60000"); + props.put("max.poll.records", 3000); + props.put("max.partition.fetch.bytes", 31457280); + props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // TODO Auto-generated method stub + this.collector = collector; + this.context = context; + Properties prop = createConsumerConfig(); + this.consumer = new KafkaConsumer<>(prop); + this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC)); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void nextTuple() { + try { + // TODO Auto-generated method stub + ConsumerRecords records = consumer.poll(10000L); + Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME); + for (ConsumerRecord record : records) { + this.collector.emit(new Values(record.value())); + } + } catch (Exception e) { + logger.error("KafkaSpout发送消息出现异常!", e); + e.printStackTrace(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // TODO Auto-generated method stub + declarer.declare(new Fields("source")); + } +} diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..8c16b16 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -0,0 +1,86 @@ +package cn.ac.iie.topology; + + +import cn.ac.iie.bolt.ConnCompletionBolt; +import cn.ac.iie.bolt.NtcLogSendBolt; +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + private LogFlowWriteTopology() { + this(LogFlowWriteTopology.class.getSimpleName()); + } + + private LogFlowWriteTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS); + return conf; + } + + private void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); + builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); + } + + public static void main(String[] args) throws Exception { + LogFlowWriteTopology csst = null; + boolean runLocally = true; + String parameter = "remote"; + int size = 2; + if (args.length >= size && parameter.equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogFlowWriteTopology(args[0]); + } else { + csst = new LogFlowWriteTopology(); + } + + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java new file mode 100644 index 0000000..f5094a4 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author Administrator + */ +public final class StormRunner{ + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/general/EncryptionUtils.java b/src/main/java/cn/ac/iie/utils/general/EncryptionUtils.java new file mode 100644 index 0000000..5c400cc --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/general/EncryptionUtils.java @@ -0,0 +1,68 @@ +package cn.ac.iie.utils.general; + +import org.apache.log4j.Logger; + +import java.security.MessageDigest; + +/** + * 描述:转换MD5工具类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ +public class EncryptionUtils { + private static Logger logger = Logger.getLogger(EncryptionUtils.class); + + public static String md5Encode(String msg) throws Exception { + try { + byte[] msgBytes = msg.getBytes("utf-8"); + /* + * 声明使用Md5算法,获得MessaDigest对象 + */ + MessageDigest md5 = MessageDigest.getInstance("MD5"); + /* + * 使用指定的字节更新摘要 + */ + md5.update(msgBytes); + /* + * 完成哈希计算,获得密文 + */ + byte[] digest = md5.digest(); + /* + * 以上两行代码等同于 + * byte[] digest = md5.digest(msgBytes); + */ + return byteArr2hexString(digest); + } catch (Exception e) { + logger.error("Error in conversion MD5! " + msg); + return ""; + } + } + + /** + * 将byte数组转化为16进制字符串形式 + * + * @param bys 字节数组 + * @return 字符串 + */ + private static String byteArr2hexString(byte[] bys) { + StringBuilder hexVal = new StringBuilder(); + int val = 0; + for (byte by : bys) { + //将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算 + val = ((int) by) & 0xff; + if (val < 16) { + hexVal.append("0"); + } + hexVal.append(Integer.toHexString(val)); + } + + return hexVal.toString(); + + } + + + public static void main(String[] args) { + + } +} diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java new file mode 100644 index 0000000..2d9e9bb --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -0,0 +1,115 @@ +package cn.ac.iie.utils.general; + +import cn.ac.iie.bean.SessionRecordLog; +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.redis.RedisPollUtils; +import cn.ac.iie.utils.system.SnowflakeId; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import org.apache.log4j.Logger; +import redis.clients.jedis.Jedis; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + * @create 2018-08-13 15:11 + */ +public class TransFormUtils { + private static Logger logger = Logger.getLogger(TransFormUtils.class); + private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb") + .loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 解析日志,并补全 + * + * @param message 原始日志 + * @return 补全后的日志 + */ + public static String getJsonMessage(String message) { + SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); + String serverIp = sessionRecordLog.getServer_ip(); + String clientIp = sessionRecordLog.getClient_ip(); + try { + sessionRecordLog.setUid(SnowflakeId.generateId()); + sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp)); + sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp)); + sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true)); + sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true)); + sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost())); + sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000); +// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp)); + return JSONObject.toJSONString(sessionRecordLog); + } catch (Exception e) { + logger.error("日志解析过程出现异常", e); + return ""; + } + + } + + /** + * 有sni通过sni获取域名,有hots根据host获取域名 + * + * @param sni sni + * @param host host + * @return 顶级域名 + */ + private static String getTopDomain(String sni, String host) { + if (StringUtil.isNotBlank(sni)) { + return getDomain(sni); + } else if (StringUtil.isNotBlank(host)) { + return getDomain(host); + } else { + return ""; + } + } + + /** + * 获取用户名 + * + * @param key Sip + * @return SubscribeId + */ + private static String getSubscribeId(String key) { + String sub = ""; + try (Jedis jedis = RedisPollUtils.getJedis()) { + if (jedis != null) { + sub = jedis.get(key); + } + } catch (Exception e) { + logger.error("通过Redis获取用户名出现异常", e); + } + return sub; + } + + + /** + * 根据url截取顶级域名 + * + * @param url 网站url + * @return 顶级域名 + */ + private static String getDomain(String url) { + try { + Matcher matcher = WEB_PATTERN.matcher(url); + if (matcher.find()) { + return matcher.group(); + } + } catch (Exception e) { + e.printStackTrace(); + } + return ""; + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java new file mode 100644 index 0000000..c16fffd --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/influxdb/InfluxDbUtils.java @@ -0,0 +1,48 @@ +package cn.ac.iie.utils.influxdb; + + +import cn.ac.iie.common.FlowWriteConfig; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * 写入influxDB工具类 + * + * @author antlee + * @date 2018/8/17 + */ +public class InfluxDbUtils { + /** + * 原始日志写入数据中心kafka失败标识 + */ + public static void sendKafkaFail() { + InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD); + Point point1 = Point.measurement("sendkafkafail") + .tag("topology", FlowWriteConfig.KAFKA_TOPIC) + .tag("hostname", getIp()) + .field("state", 1) + .build(); + client.write("kafka", "", point1); + } + + /** + * 获取本机IP + * + * @return IP地址 + */ + private static String getIp() { + InetAddress addr; + try { + addr = InetAddress.getLocalHost(); + return addr.getHostAddress(); + } catch (UnknownHostException e) { + e.printStackTrace(); + return null; + } + } + +} diff --git a/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java new file mode 100644 index 0000000..d16c79f --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java @@ -0,0 +1,83 @@ +package cn.ac.iie.utils.kafka; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.influxdb.InfluxDbUtils; +import org.apache.kafka.clients.producer.*; +import org.apache.log4j.Logger; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +/** + * NTC系统配置产生日志写入数据中心类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class KafkaLogNtc { + private static Logger logger = Logger.getLogger(KafkaLogNtc.class); + + /** + * kafka生产者,用于向kafka中发送消息 + */ + private static Producer kafkaProducer; + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static KafkaLogNtc kafkaLogNtc; + + private KafkaLogNtc() { + initKafkaProducer(); + } + + public static KafkaLogNtc getInstance() { + if (kafkaLogNtc == null) { + kafkaLogNtc = new KafkaLogNtc(); + } + return kafkaLogNtc; + } + + + public void sendMessage(List list) { + final int[] errorSum = {0}; + for (String value : list) { + kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception); + errorSum[0]++; + } + } + }); + if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) { + logger.error("超出最大容忍错误数抛弃数据条数:" + list.size()); + list.clear(); + } + } + kafkaProducer.flush(); + logger.warn("Log sent to National Center successfully!!!!!"); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 60000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); +// properties.put("compression.type", "snappy"); + kafkaProducer = new KafkaProducer<>(properties); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java new file mode 100644 index 0000000..e7f67d9 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/redis/RedisClusterUtils.java @@ -0,0 +1,79 @@ +package cn.ac.iie.utils.redis; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.log4j.Logger; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPoolConfig; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Properties; +import java.util.Set; + +/** + * 预用于对准IP对应的用户名的 Redis连接池 + * + * @author my + * @date 2018-07-04 + */ +public final class RedisClusterUtils { + private static final Logger logger = Logger.getLogger(RedisClusterUtils.class); + private static JedisCluster jedisCluster; + private static Properties props = new Properties(); + + static { + try { + String redisConfigFile = "redis_config.properties"; + props.load(RedisClusterUtils.class.getClassLoader().getResourceAsStream(redisConfigFile)); + } catch (IOException e) { + props = null; + logger.error("加载Redis配置文件失败!", e); + } + } + + /** + * 不允许通过new创建该类的实例 + */ + private RedisClusterUtils() { + } + + /** + * 初始化Redis连接池 + */ + private static JedisCluster getJedisCluster() { + if (jedisCluster == null) { + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE))); + poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE))); + poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT))); + poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN))); + poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW))); + Set nodes = new LinkedHashSet(); + for (String port : props.getProperty(FlowWriteConfig.REDIS_PORT).split(FlowWriteConfig.SEGMENTATION)) { + for (String ip : props.getProperty(FlowWriteConfig.REDIS_IP).split(FlowWriteConfig.SEGMENTATION)) { + nodes.add(new HostAndPort(ip, Integer.parseInt(port))); + } + } + jedisCluster = new JedisCluster(nodes, poolConfig); + } + return jedisCluster; + } + + /** + * 获取用户名 + * + * @param key service_ip + * @return Subscribe_id + */ + public static String get(String key) { + String s = key.split("\\.")[0]; + if (!FlowWriteConfig.CHECK_IP_SCOPE.contains(s)) { + jedisCluster = getJedisCluster(); + return jedisCluster.get(key); + } + return ""; + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java new file mode 100644 index 0000000..378bef5 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java @@ -0,0 +1,115 @@ +package cn.ac.iie.utils.redis; + +import cn.ac.iie.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.lang3.RandomUtils; +import org.apache.log4j.Logger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +import java.util.Properties; + +/** + * @author qidaijie + */ +public class RedisPollUtils { + private static final Logger logger = Logger.getLogger(RedisPollUtils.class); + private static JedisPool jedisPool = null; + private static Properties props = new Properties(); + + + private RedisPollUtils() { + } + + static { + initialPool(); + + } + + /** + * 初始化Redis连接池 + */ + private static void initialPool() { + try { + //加载连接池配置文件 + props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream("redis_config.properties")); + // 创建jedis池配置实例 + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE))); + poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE))); + poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT))); + poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN))); + poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW))); + // 根据配置实例化jedis池 + jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP), + Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT))); + } catch (Exception e) { + logger.error("Redis连接池初始化错误", e); + } + } + + /** + * 获取Jedis实例 + * + * @return Jedis实例 + */ + public static Jedis getJedis() { + Jedis jedis = null; + try { + if (jedisPool == null) { + initialPool(); + } + jedis = jedisPool.getResource(); + } catch (Exception e) { + logger.error("Redis连接池错误,无法获取连接", e); + } + return jedis; + } + +// /** +// * @param key redis key +// * @return value +// */ +// public static Integer getWorkerId(String key) { +// int workId = 0; +// int maxId = 32; +// try (Jedis jedis = RedisPollUtils.getJedis()) { +// if (jedis != null) { +// String work = jedis.get(key); +// if (StringUtil.isBlank(work)) { +// jedis.set(key, "0"); +// } else { +// workId = Integer.parseInt(work); +// } +// if (workId < maxId) { +// jedis.set(key, String.valueOf(workId + 1)); +// } else { +// workId = 0; +// jedis.set(key, "1"); +// } +// } +// } catch (Exception e) { +// logger.error("通过Redis获取用户名出现异常", e); +// workId = RandomUtils.nextInt(0, 31); +// } +// return workId; +// } + + public static Integer getWorkerId(String key) { + int workId = 0; + try (Jedis jedis = RedisPollUtils.getJedis()) { + if (jedis != null) { + workId = Integer.parseInt(jedis.get(key)); + jedis.set(key, String.valueOf(workId + 2)); + logger.error("\n工作id是:" + workId + "\n"); + } + } catch (Exception e) { + logger.error("通过Redis获取用户名出现异常", e); + workId = RandomUtils.nextInt(0, 31); + } + return workId; + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java b/src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..273a5f8 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java @@ -0,0 +1,65 @@ +package cn.ac.iie.utils.system; + +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + // private static Properties propCommon = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); +// } else if (type == 1) { +// return propCommon.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); +// } else if (type == 1) { +// return Integer.parseInt(propCommon.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); +// } else if (type == 1) { +// return Long.parseLong(propCommon.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propService.getProperty(key).toLowerCase().trim()); +// } else if (type == 1) { +// return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + } catch (Exception e) { +// propCommon = null; + propService = null; + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java new file mode 100644 index 0000000..6f11ddd --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/SnowflakeId.java @@ -0,0 +1,202 @@ +package cn.ac.iie.utils.system; + +import cn.ac.iie.common.FlowWriteConfig; +import cn.ac.iie.utils.kafka.KafkaLogNtc; +import cn.ac.iie.utils.redis.RedisPollUtils; +import cn.ac.iie.utils.zookeeper.ZookeeperUtils; +import org.apache.log4j.Logger; +import org.apache.zookeeper.ZooDefs; + +/** + * 雪花算法 + * + * @author qidaijie + */ +public class SnowflakeId { + private static Logger logger = Logger.getLogger(SnowflakeId.class); + + // ==============================Fields=========================================== + /** + * 开始时间截 (2018-08-01 00:00:00) max 17years + */ + private final long twepoch = 1564588800000L; + + /** + * 机器id所占的位数 + */ + private final long workerIdBits = 6L; + + /** + * 数据标识id所占的位数 + */ + private final long dataCenterIdBits = 4L; + + /** + * 支持的最大机器id,结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** + * 支持的最大数据标识id,结果是15 + */ + private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); + + /** + * 序列在id中占的位数 + */ + private final long sequenceBits = 14L; + + /** + * 机器ID向左移12位 + */ + private final long workerIdShift = sequenceBits; + + /** + * 数据标识id向左移17位(14+6) + */ + private final long dataCenterIdShift = sequenceBits + workerIdBits; + + /** + * 时间截向左移22位(4+6+14) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; + + /** + * 生成序列的掩码,这里为16383 + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** + * 工作机器ID(0~63) + */ + private long workerId; + + /** + * 数据中心ID(0~15) + */ + private long dataCenterId; + + /** + * 毫秒内序列(0~16383) + */ + private long sequence = 0L; + + /** + * 上次生成ID的时间截 + */ + private long lastTimestamp = -1L; + + private static SnowflakeId idWorker; + + static { + ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + zookeeperUtils.createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); +// idWorker = new SnowflakeId(RedisPollUtils.getWorkerId(FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM); + idWorker = new SnowflakeId(zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC), FlowWriteConfig.DATA_CENTER_ID_NUM); +// idWorker = new SnowflakeId(12, FlowWriteConfig.DATA_CENTER_ID_NUM); + } + + //==============================Constructors===================================== + + /** + * 构造函数 + * + * @param workerId 工作ID (0~63) + * @param dataCenterId 数据中心ID (0~15) + */ + private SnowflakeId(int workerId, int dataCenterId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId)); + } + if (dataCenterId > maxDataCenterId || dataCenterId < 0) { + throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId)); + } + this.workerId = workerId; + this.dataCenterId = dataCenterId; + } + + // ==============================Methods========================================== + + /** + * 获得下一个ID (该方法是线程安全的) + * + * @return SnowflakeId + */ + private synchronized long nextId() { + long timestamp = timeGen(); + + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) + | (dataCenterId << dataCenterIdShift) + | (workerId << workerIdShift) + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + + /** + * 静态工具类 + * + * @return + */ + public static Long generateId() { + return idWorker.nextId(); + } + + public static void main(String[] args) { + while (true) { + try { + logger.info(SnowflakeId.generateId()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/system/TupleUtils.java b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java new file mode 100644 index 0000000..53e14ca --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/system/TupleUtils.java @@ -0,0 +1,23 @@ +package cn.ac.iie.utils.system; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * 用于检测是否是系统发送的tuple + * + * @author Administrator + */ +public final class TupleUtils { + /** + * 判断是否系统自动发送的Tuple + * + * @param tuple 元组 + * @return true or false + */ + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 0000000..5769144 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,123 @@ +package cn.ac.iie.utils.zookeeper; + +import cn.ac.iie.common.FlowWriteConfig; +import org.apache.commons.lang3.RandomUtils; +import org.apache.log4j.Logger; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * @author qidaijie + */ +public class ZookeeperUtils implements Watcher { + private static Logger logger = Logger.getLogger(ZookeeperUtils.class); + + private ZooKeeper zookeeper; + + private static final int SESSION_TIME_OUT = 20000; + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + countDownLatch.countDown(); + } + } + + + /** + * 修改节点信息 + * + * @param path 节点路径 + */ + public int modifyNode(String path) { + int workerId; + try { + connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS); + Stat stat = zookeeper.exists(path, true); + workerId = Integer.parseInt(getNodeDate(path)); + String result = String.valueOf(workerId + 2); + if (stat != null) { + zookeeper.setData(path, result.getBytes(), stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + workerId = RandomUtils.nextInt(30, 63); + } finally { + closeConn(); + } + logger.error("工作ID是:" + workerId); + return workerId; + } + + /** + * 连接zookeeper + * + * @param host 地址 + */ + public void connectZookeeper(String host) { + try { + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + countDownLatch.await(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + public void closeConn() { + try { + zookeeper.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * 获取节点内容 + * + * @param path 节点路径 + * @return 内容/异常null + */ + public String getNodeDate(String path) { + String result = null; + Stat stat = new Stat(); + try { + byte[] resByte = zookeeper.getData(path, true, stat); + result = new String(resByte); + } catch (KeeperException | InterruptedException e) { + logger.error("Get node information exception"); + e.printStackTrace(); + } + return result; + } + + /** + * @param path 节点创建的路径 + * @param date 节点所存储的数据的byte[] + * @param acls 控制权限策略 + */ + public void createNode(String path, byte[] date, List acls) { + try { + connectZookeeper(FlowWriteConfig.ZOOKEEPER_SERVERS); + Stat exists = zookeeper.exists(path, true); + if (exists == null) { + zookeeper.create(path, date, acls, CreateMode.PERSISTENT); + } else { + logger.warn("Node already exists!,Don't need to create"); + } + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } finally { + closeConn(); + } + } + +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..8945804 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,23 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/DomainUtils.java b/src/test/java/cn/ac/iie/test/DomainUtils.java new file mode 100644 index 0000000..e7bdf78 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/DomainUtils.java @@ -0,0 +1,37 @@ +package cn.ac.iie.test; + +import com.zdjizhi.utils.StringUtil; + +import javax.xml.bind.SchemaOutputResolver; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class DomainUtils { + + private static Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); + + public static void main(String[] args) { + System.out.println(getTopDomain("agoo-report.m.taobao.com")); + + } + + private static String getTopDomain(String url) { +// try { + //获取值转换为小写 +// String host = new URL(url).getHost().toLowerCase();//news.hexun.com +// Pattern pattern = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); + Matcher matcher = pattern.matcher(url); + if (matcher.find()){ + return matcher.group(); + } +// } catch (MalformedURLException e) { +// e.printStackTrace(); +// } + return null; + } + +} diff --git a/src/test/java/cn/ac/iie/test/SnowflakeIdWorker.java b/src/test/java/cn/ac/iie/test/SnowflakeIdWorker.java new file mode 100644 index 0000000..470284a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/SnowflakeIdWorker.java @@ -0,0 +1,182 @@ +package cn.ac.iie.test; + +/** + * Twitter_Snowflake
+ * SnowFlake的结构如下(每部分用-分开):
+ * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000
+ * 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0
+ * 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截) + * 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69
+ * 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId
+ * 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号
+ * 加起来刚好64位,为一个Long型。
+ * SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。 + */ +public class SnowflakeIdWorker { +// ==============================Fields=========================================== + /** + * 开始时间截 (2015-01-01) + */ + private final long twepoch = timeGen(); + + /** + * 机器id所占的位数 + */ + private final long workerIdBits = 5L; + + /** + * 数据标识id所占的位数 + */ + private final long datacenterIdBits = 5L; + + /** + * 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** + * 支持的最大数据标识id,结果是31 + */ + private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); + + /** + * 序列在id中占的位数 + */ + private final long sequenceBits = 12L; + + /** + * 机器ID向左移12位 + */ + private final long workerIdShift = sequenceBits; + + /** + * 数据标识id向左移17位(12+5) + */ + private final long datacenterIdShift = sequenceBits + workerIdBits; + + /** + * 时间截向左移22位(5+5+12) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + + /** + * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** + * 工作机器ID(0~31) + */ + private long workerId; + + /** + * 数据中心ID(0~31) + */ + private long datacenterId; + + /** + * 毫秒内序列(0~4095) + */ + private long sequence = 0L; + + /** + * 上次生成ID的时间截 + */ + private long lastTimestamp = -1L; + + //==============================Constructors===================================== + + /** + * 构造函数 + * + * @param workerId 工作ID (0~31) + * @param datacenterId 数据中心ID (0~31) + */ + public SnowflakeIdWorker(long workerId, long datacenterId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (datacenterId > maxDatacenterId || datacenterId < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); + } + this.workerId = workerId; + this.datacenterId = datacenterId; + } + + // ==============================Methods========================================== + + /** + * 获得下一个ID (该方法是线程安全的) + * + * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) + | (datacenterId << datacenterIdShift) + | (workerId << workerIdShift) + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + //==============================Test============================================= + + /** + * 测试 + */ + public static void main(String[] args) { + SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0); + for (int i = 0; i < 1000; i++) { + long id = idWorker.nextId(); +// System.out.println(Long.toBinaryString(id)); + System.out.println(id); + } + } +} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/TestThread.java b/src/test/java/cn/ac/iie/test/TestThread.java new file mode 100644 index 0000000..5938b8a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/TestThread.java @@ -0,0 +1,49 @@ +package cn.ac.iie.test; + + +import cn.ac.iie.utils.system.SnowflakeId; +import cn.ac.iie.utils.zookeeper.ZookeeperUtils; + +class RunnableDemo implements Runnable { + private Thread t; + + private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + static { + zookeeperUtils.connectZookeeper("192.168.40.207:2181"); + } + + @Override + public void run() { + zookeeperUtils.modifyNode("/testNode/UID-TEST"); + System.out.println(zookeeperUtils.getNodeDate("/testNode/UID-TEST")); +// zookeeperUtils.closeConn(); + + } + + public void start() { + if (t == null) { + t = new Thread(this); + t.start(); + } + } +} + +public class TestThread { + public static void main(String[] args) { + RunnableDemo R1 = new RunnableDemo(); + RunnableDemo R2 = new RunnableDemo(); +// RunnableDemo R3 = new RunnableDemo(); +// RunnableDemo R4 = new RunnableDemo(); + R1.start(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + R2.start(); +// R3.start(); +// R4.start(); + + } +} diff --git a/src/test/java/cn/ac/iie/test/URLUtil.java b/src/test/java/cn/ac/iie/test/URLUtil.java new file mode 100644 index 0000000..1b1e590 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/URLUtil.java @@ -0,0 +1,90 @@ +package cn.ac.iie.test; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; + +public class URLUtil { + + private final static Set PublicSuffixSet = new HashSet( + Arrays.asList(new String( + "com|org|net|gov|edu|co|tv|mobi|info|asia|xxx|onion|cn|com.cn|edu.cn|gov.cn|net.cn|org.cn|jp|kr|tw|com.hk|hk|com.hk|org.hk|se|com.se|org.se") + .split("\\|"))); + + private static Pattern IP_PATTERN = Pattern.compile("(\\d{1,3}\\.){3}(\\d{1,3})"); + + /** + * 获取url的顶级域名 + * + * @param url + * @return + */ + public static String getDomainName(URL url) { + String host = url.getHost(); + if (host.endsWith(".")) { + host = host.substring(0, host.length() - 1); + } + if (IP_PATTERN.matcher(host).matches()) { + return host; + } + + int index = 0; + String candidate = host; + for (; index >= 0; ) { + index = candidate.indexOf('.'); + String subCandidate = candidate.substring(index + 1); + if (PublicSuffixSet.contains(subCandidate)) { + return candidate; + } + candidate = subCandidate; + } + return candidate; + } + + /** + * 获取url的顶级域名 + * + * @param url + * @return + * @throws MalformedURLException + */ + public static String getDomainName(String url) throws MalformedURLException { + return getDomainName(new URL(url)); + } + + /** + * 判断两个url顶级域名是否相等 + * + * @param url1 + * @param url2 + * @return + */ + public static boolean isSameDomainName(URL url1, URL url2) { + return getDomainName(url1).equalsIgnoreCase(getDomainName(url2)); + } + + /** + * 判断两个url顶级域名是否相等 + * + * @param url1 + * @param url2 + * @return + * @throws MalformedURLException + */ + public static boolean isSameDomainName(String url1, String url2) + throws MalformedURLException { + return isSameDomainName(new URL(url1), new URL(url2)); + } + + public static void main(String[] args) throws Exception { +// String urlStr = "http://news.hexun.com/2017-09-23/190978248.html"; + String urlStr = "array703-prod.do.dsp.mp.microsoft.com"; + System.out.println(getDomainName(urlStr.replace("\uFEFF", ""))); + System.out.println(getDomainName(new URL(urlStr.replace("\uFEFF", "")))); + + } + +} diff --git a/src/test/java/cn/ac/iie/test/ZookeeperTest.java b/src/test/java/cn/ac/iie/test/ZookeeperTest.java new file mode 100644 index 0000000..fe4e2b3 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/ZookeeperTest.java @@ -0,0 +1,126 @@ +package cn.ac.iie.test; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +public class ZookeeperTest implements Watcher { + private static Logger logger = Logger.getLogger(ZookeeperTest.class); + private static ZooKeeper zookeeper; + + private static final int SESSION_TIME_OUT = 2000; + +// private static Stat stat = new Stat(); + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + countDownLatch.countDown(); + } + } + + /** + * 连接zookeeper + * + * @param host 地址 + */ + private void connectZookeeper(String host) { + try { + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + countDownLatch.await(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + + /** + * @param path 路径 + * @return 子节点 + */ + private List getChildren(String path) { + try { + return zookeeper.getChildren(path, false); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + return null; + } + } + + /** + * @param path 节点创建的路径 + * @param date 节点所存储的数据的byte[] + * @param acls 控制权限策略 + */ + private void createNode(String path, byte[] date, List acls) { + try { + Stat exists = zookeeper.exists(path, true); + if (exists == null) { + zookeeper.create(path, date, acls, CreateMode.PERSISTENT); + } else { + logger.warn("Node already exists!,Don't need to create"); + } + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * 修改节点信息 + * + * @param path 节点路径 + * @param date 修改的数据 + */ + private void modifyNode(String path, byte[] date) { + try { + Stat stat = zookeeper.exists(path, true); + if (stat != null) { + zookeeper.setData(path, date, stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * 获取节点内容 + * + * @param path 节点路径 + * @return 内容/异常null + */ + private String getNodeDate(String path) { + String result = null; + Stat stat = new Stat(); + try { + byte[] resByte = zookeeper.getData(path, true, stat); + result = new String(resByte); + } catch (KeeperException | InterruptedException e) { + logger.error("Get node information exception"); + e.printStackTrace(); + } + return result; + } + + + public static void main(String[] args) { + ZookeeperTest zookeeperTest = new ZookeeperTest(); + try { + zookeeperTest.connectZookeeper("192.168.40.207:2181"); + zookeeperTest.createNode("/Snowflake", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE); +// System.out.println(zookeeperTest.getNodeDate("/testNode/UID-TEST")); +// zookeeperTest.modifyNode("/testNode/UID-TEST", "2".getBytes()); +// System.out.println(zookeeperTest.getNodeDate("/testNode/UID-TEST")); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/cn/ac/iie/test/a.xml b/src/test/java/cn/ac/iie/test/a.xml new file mode 100644 index 0000000..5e857b3 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/a.xml @@ -0,0 +1,12 @@ + + + 1 + + false + + {ip} + 9001 + default + {rootpassword} + + diff --git a/src/test/java/cn/ac/iie/test/influxQueryTest.java b/src/test/java/cn/ac/iie/test/influxQueryTest.java new file mode 100644 index 0000000..7bf5497 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/influxQueryTest.java @@ -0,0 +1,52 @@ +package cn.ac.iie.test; + +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Pong; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class influxQueryTest { + private static InfluxDB client; + + public static void main(String[] args) { + + Query query = new Query("select * from test", "deltest"); + QueryResult a = getClient().query(query); + +// List lists = new ArrayList(); + for (QueryResult.Result result : a.getResults()) { + List series = result.getSeries(); + for (QueryResult.Series serie : series) { + List> values = serie.getValues();//字段字集合 + List colums = serie.getColumns();//字段名 + System.out.println("colums:" + colums); + for (List n : values) { + System.out.println("value:" + n); + } +//lists.addAll(getQueryData(colums,values)); + } + System.out.println("数据长度:" + series.size()); + } + } + + /** + * 链接时序数据库,获得InfluDB + */ + public static InfluxDB getClient() { + client = InfluxDBFactory.connect("http://192.168.40.207:8086", "test", "123456"); + Pong pong = client.ping(); + if (pong != null) { + System.out.println("Influx数据库连接成功"); + } else { + return null; + } +// client.createDatabase("testDB"); + return client; + } +} diff --git a/src/test/java/cn/ac/iie/test/test.java b/src/test/java/cn/ac/iie/test/test.java new file mode 100644 index 0000000..9d2d332 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/test.java @@ -0,0 +1,46 @@ +package cn.ac.iie.test; + +import cn.ac.iie.bean.SessionRecordLog; +import cn.ac.iie.common.FlowWriteConfig; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.IpLookup; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import java.math.BigInteger; +import java.util.Arrays; + +public class test { + public static void main(String[] args) { + String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\"}"; + SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); + System.out.println(sessionRecordLog.getStream_trace_id()); + + String message2 = "{\"streamtraceid\":\"JSON\"}"; + SessionRecordLog sessionRecordLog2 = JSONObject.parseObject(message2, SessionRecordLog.class); + System.out.println(sessionRecordLog2.getStream_trace_id()); + + JSONObject jsonObject = JSON.parseObject(message); + System.out.println("\n" + Arrays.toString(jsonObject.keySet().toArray())); + + HttpServletRequest request = null; + if (request != null) { + String contextPath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + request.getContextPath(); + } + + System.out.println(System.currentTimeMillis() / 1000); + } + + @Test + public void test2() { +// String minTimeStampStr = "00000000000000000000000000000000000000000"; + String minTimeStampStr = "000000000000000000000000000000000000000"; + long minTimeStamp = new BigInteger(minTimeStampStr, 2).longValue(); +// String maxTimeStampStr = "11111111111111111111111111111111111111111"; + String maxTimeStampStr = "111111111111111111111111111111111111111"; + long maxTimeStamp = new BigInteger(maxTimeStampStr, 2).longValue(); + long oneYearMills = 1L * 1000 * 60 * 60 * 24 * 365; + System.out.println((maxTimeStamp - minTimeStamp) / oneYearMills); + } +}