Revert "Merge branch 'tsg_galaxy_com_schema_v3.0.20191217' into 'master'"

This reverts merge request !2
This commit is contained in:
李玺康
2020-02-25 15:12:56 +08:00
parent 2c448492c6
commit 94f14a0c59
39 changed files with 1441 additions and 1860 deletions

View File

@@ -0,0 +1,672 @@
package cn.ac.iie.bean;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter;
/**
* @author qidaijie
*/
public class SessionRecordLog {
private long uid;
private int policy_id;
private long action;
private int start_time;
private int end_time;
private long recv_time;
private String trans_proto;
private String app_proto;
private int addr_type;
private String server_ip;
private String client_ip;
private int server_port;
private int client_port;
private int service;
private int entrance_id;
private int device_id;
private int Link_id;
private String isp;
private int encap_type;
private int direction;
private int stream_dir;
private String cap_ip;
private String addr_list;
private String server_location;
private String client_location;
private String client_asn;
private String server_asn;
private String subscribe_id;
private long con_duration_ms;
private String url;
private String host;
private String domain;
private String category;
private String req_line;
private String res_line;
private String cookie;
private String referer;
private String user_agent;
private String content_len;
private String content_type;
private String set_cookie;
private String req_header;
private String resp_header;
private String req_body_key;
private String req_body;
private String res_body_key;
private String resp_body;
private String version;
private String sni;
private String san;
private String cn;
private int app_id;
private int protocol_id;
private long con_latency_ms;
private int pinningst;
private int intercept_state;
private long ssl_server_side_latency;
private long ssl_client_side_latency;
private String ssl_server_side_version;
private String ssl_client_side_version;
private int ssl_cert_verify;
private String stream_trace_id;
private String ssl_error;
private long c2s_pkt_num;
private long S2c_pkt_num;
private long c2s_byte_num;
private long s2c_byte_num;
private String nas_ip;
private String framed_ip;
private String account;
private int packet_type;
private int has_dup_traffic;
private String stream_error;
public SessionRecordLog() {
}
public long getUid() {
return uid;
}
public void setUid(long uid) {
this.uid = uid;
}
public int getPolicy_id() {
return policy_id;
}
public void setPolicy_id(int policy_id) {
this.policy_id = policy_id;
}
public long getAction() {
return action;
}
public void setAction(long action) {
this.action = action;
}
public int getStart_time() {
return start_time;
}
public void setStart_time(int start_time) {
this.start_time = start_time;
}
public int getEnd_time() {
return end_time;
}
public void setEnd_time(int end_time) {
this.end_time = end_time;
}
public String getSsl_error() {
return ssl_error;
}
public void setSsl_error(String ssl_error) {
this.ssl_error = ssl_error;
}
public String getApp_proto() {
return app_proto;
}
public void setApp_proto(String app_proto) {
this.app_proto = app_proto;
}
public long getRecv_time() {
return recv_time;
}
public void setRecv_time(long recv_time) {
this.recv_time = recv_time;
}
public String getTrans_proto() {
return trans_proto;
}
public void setTrans_proto(String trans_proto) {
this.trans_proto = trans_proto;
}
public int getAddr_type() {
return addr_type;
}
public void setAddr_type(int addr_type) {
this.addr_type = addr_type;
}
public String getServer_ip() {
return server_ip;
}
public void setServer_ip(String server_ip) {
this.server_ip = server_ip;
}
public String getClient_ip() {
return client_ip;
}
public void setClient_ip(String client_ip) {
this.client_ip = client_ip;
}
public int getServer_port() {
return server_port;
}
public void setServer_port(int server_port) {
this.server_port = server_port;
}
public int getClient_port() {
return client_port;
}
public void setClient_port(int client_port) {
this.client_port = client_port;
}
public int getService() {
return service;
}
public void setService(int service) {
this.service = service;
}
public int getEntrance_id() {
return entrance_id;
}
public void setEntrance_id(int entrance_id) {
this.entrance_id = entrance_id;
}
public int getDevice_id() {
return device_id;
}
public void setDevice_id(int device_id) {
this.device_id = device_id;
}
public int getLink_id() {
return Link_id;
}
public void setLink_id(int link_id) {
Link_id = link_id;
}
public String getIsp() {
return isp;
}
public void setIsp(String isp) {
this.isp = isp;
}
public int getEncap_type() {
return encap_type;
}
public void setEncap_type(int encap_type) {
this.encap_type = encap_type;
}
public int getDirection() {
return direction;
}
public void setDirection(int direction) {
this.direction = direction;
}
public int getStream_dir() {
return stream_dir;
}
public void setStream_dir(int stream_dir) {
this.stream_dir = stream_dir;
}
public String getCap_ip() {
return cap_ip;
}
public void setCap_ip(String cap_ip) {
this.cap_ip = cap_ip;
}
public String getAddr_list() {
return addr_list;
}
public void setAddr_list(String addr_list) {
this.addr_list = addr_list;
}
public String getServer_location() {
return server_location;
}
public void setServer_location(String server_location) {
this.server_location = server_location;
}
public String getClient_location() {
return client_location;
}
public void setClient_location(String client_location) {
this.client_location = client_location;
}
public String getClient_asn() {
return client_asn;
}
public void setClient_asn(String client_asn) {
this.client_asn = client_asn;
}
public String getServer_asn() {
return server_asn;
}
public void setServer_asn(String server_asn) {
this.server_asn = server_asn;
}
public String getSubscribe_id() {
return subscribe_id;
}
public void setSubscribe_id(String subscribe_id) {
this.subscribe_id = subscribe_id;
}
public long getCon_duration_ms() {
return con_duration_ms;
}
public void setCon_duration_ms(long con_duration_ms) {
this.con_duration_ms = con_duration_ms;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getReq_line() {
return req_line;
}
public void setReq_line(String req_line) {
this.req_line = req_line;
}
public String getRes_line() {
return res_line;
}
public void setRes_line(String res_line) {
this.res_line = res_line;
}
public String getCookie() {
return cookie;
}
public void setCookie(String cookie) {
this.cookie = cookie;
}
public String getReferer() {
return referer;
}
public void setReferer(String referer) {
this.referer = referer;
}
public String getUser_agent() {
return user_agent;
}
public void setUser_agent(String user_agent) {
this.user_agent = user_agent;
}
public String getContent_len() {
return content_len;
}
public void setContent_len(String content_len) {
this.content_len = content_len;
}
public String getContent_type() {
return content_type;
}
public void setContent_type(String content_type) {
this.content_type = content_type;
}
public String getSet_cookie() {
return set_cookie;
}
public void setSet_cookie(String set_cookie) {
this.set_cookie = set_cookie;
}
public String getReq_header() {
return req_header;
}
public void setReq_header(String req_header) {
this.req_header = req_header;
}
public String getResp_header() {
return resp_header;
}
public void setResp_header(String resp_header) {
this.resp_header = resp_header;
}
public String getReq_body_key() {
return req_body_key;
}
public void setReq_body_key(String req_body_key) {
this.req_body_key = req_body_key;
}
public String getReq_body() {
return req_body;
}
public void setReq_body(String req_body) {
this.req_body = req_body;
}
public String getRes_body_key() {
return res_body_key;
}
public void setRes_body_key(String res_body_key) {
this.res_body_key = res_body_key;
}
public String getResp_body() {
return resp_body;
}
public void setResp_body(String resp_body) {
this.resp_body = resp_body;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getSni() {
return sni;
}
public void setSni(String sni) {
this.sni = sni;
}
public String getSan() {
return san;
}
public void setSan(String san) {
this.san = san;
}
public String getCn() {
return cn;
}
public void setCn(String cn) {
this.cn = cn;
}
public int getApp_id() {
return app_id;
}
public void setApp_id(int app_id) {
this.app_id = app_id;
}
public int getProtocol_id() {
return protocol_id;
}
public void setProtocol_id(int protocol_id) {
this.protocol_id = protocol_id;
}
public int getIntercept_state() {
return intercept_state;
}
public void setIntercept_state(int intercept_state) {
this.intercept_state = intercept_state;
}
public long getSsl_server_side_latency() {
return ssl_server_side_latency;
}
public void setSsl_server_side_latency(long ssl_server_side_latency) {
this.ssl_server_side_latency = ssl_server_side_latency;
}
public long getSsl_client_side_latency() {
return ssl_client_side_latency;
}
public void setSsl_client_side_latency(long ssl_client_side_latency) {
this.ssl_client_side_latency = ssl_client_side_latency;
}
public String getSsl_server_side_version() {
return ssl_server_side_version;
}
public void setSsl_server_side_version(String ssl_server_side_version) {
this.ssl_server_side_version = ssl_server_side_version;
}
public String getSsl_client_side_version() {
return ssl_client_side_version;
}
public void setSsl_client_side_version(String ssl_client_side_version) {
this.ssl_client_side_version = ssl_client_side_version;
}
public int getSsl_cert_verify() {
return ssl_cert_verify;
}
public void setSsl_cert_verify(int ssl_cert_verify) {
this.ssl_cert_verify = ssl_cert_verify;
}
public String getStream_trace_id() {
return stream_trace_id;
}
public void setStream_trace_id(String stream_trace_id) {
this.stream_trace_id = stream_trace_id;
}
public long getCon_latency_ms() {
return con_latency_ms;
}
public void setCon_latency_ms(long con_latency_ms) {
this.con_latency_ms = con_latency_ms;
}
public int getPinningst() {
return pinningst;
}
public void setPinningst(int pinningst) {
this.pinningst = pinningst;
}
public long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public long getS2c_pkt_num() {
return S2c_pkt_num;
}
public void setS2c_pkt_num(long s2c_pkt_num) {
S2c_pkt_num = s2c_pkt_num;
}
public long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getNas_ip() {
return nas_ip;
}
public void setNas_ip(String nas_ip) {
this.nas_ip = nas_ip;
}
public String getFramed_ip() {
return framed_ip;
}
public void setFramed_ip(String framed_ip) {
this.framed_ip = framed_ip;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public int getPacket_type() {
return packet_type;
}
public void setPacket_type(int packet_type) {
this.packet_type = packet_type;
}
public int getHas_dup_traffic() {
return has_dup_traffic;
}
public void setHas_dup_traffic(int has_dup_traffic) {
this.has_dup_traffic = has_dup_traffic;
}
public String getStream_error() {
return stream_error;
}
public void setStream_error(String stream_error) {
this.stream_error = stream_error;
}
}

View File

@@ -1,6 +1,5 @@
package cn.ac.iie.bolt.radius;
package cn.ac.iie.bolt;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
@@ -13,17 +12,16 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage;
/**
* 通联关系日志补全
*
* @author qidaijie
*/
public class RadiusCompletionBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class);
private static final long serialVersionUID = -3657802387129063952L;
public class ConnCompletionBolt extends BaseBasicBolt {
private static final long serialVersionUID = -1059151670138465894L;
private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context) {
@@ -35,15 +33,13 @@ public class RadiusCompletionBolt extends BaseBasicBolt {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
basicOutputCollector.emit(new Values(getJsonMessage(message)));
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
logger.error("接收解析过程出现异常", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));

View File

@@ -24,11 +24,11 @@ import java.util.Map;
* @date 2018/8/14
*/
public class NtcLogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3663610927224396615L;
private static final long serialVersionUID = 3940515789830317517L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private List<String> list;
private KafkaLogNtc kafkaLogNtc;
// private static long successfulSum = 0;
private static long successfulSum = 0;
@Override
@@ -43,11 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt {
if (TupleUtils.isTick(tuple)) {
if (list.size() != 0) {
kafkaLogNtc.sendMessage(list);
// successfulSum += list.size();
successfulSum += list.size();
list.clear();
}
// basicOutputCollector.emit(new Values(successfulSum));
// successfulSum = 0L;
basicOutputCollector.emit(new Values(successfulSum));
successfulSum = 0L;
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
@@ -55,12 +55,12 @@ public class NtcLogSendBolt extends BaseBasicBolt {
}
if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
kafkaLogNtc.sendMessage(list);
// successfulSum += list.size();
successfulSum += list.size();
list.clear();
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
logger.error("日志发送Kafka过程出现异常 ", e);
e.printStackTrace();
}
}
@@ -74,7 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// outputFieldsDeclarer.declare(new Fields("suc"));
outputFieldsDeclarer.declare(new Fields("suc"));
}
}

View File

@@ -1,68 +0,0 @@
package cn.ac.iie.bolt.collect;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.TupleUtils;
import com.zdjizhi.utils.HBaseUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
/**
* 通联关系日志补全
*
* @author qidaijie
*/
public class CollectCompletedBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(CollectCompletedBolt.class);
private static final long serialVersionUID = 4682827168247333522L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
HBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
}
}

View File

@@ -1,66 +0,0 @@
package cn.ac.iie.bolt.proxy;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.TupleUtils;
import com.zdjizhi.utils.HBaseUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
/**
* 通联关系日志补全
*
* @author qidaijie
*/
public class ProxyCompletionBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(ProxyCompletionBolt.class);
private static final long serialVersionUID = 6097654428594885032L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
HBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
}
}

View File

@@ -1,68 +0,0 @@
package cn.ac.iie.bolt.security;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.TupleUtils;
import com.zdjizhi.utils.HBaseUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
/**
* 通联关系日志补全
*
* @author qidaijie
*/
public class SecurityCompletionBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(SecurityCompletionBolt.class);
private static final long serialVersionUID = -2380858260054733989L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
HBaseUtils.change();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
}
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
}
}

View File

@@ -8,9 +8,10 @@ import cn.ac.iie.utils.system.FlowWriteConfigurations;
*/
public class FlowWriteConfig {
public static final int IPV4_TYPE = 1;
public static final int IPV6_TYPE = 2;
public static final String DOMAIN_SPLITTER = ".";
public static final String LOG_STRING_SPLITTER = "\t";
public static final String SQL_STRING_SPLITTER = "#";
public static final String SEGMENTATION = ",";
/**
* System
*/
@@ -19,7 +20,6 @@ public class FlowWriteConfig {
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
@@ -40,8 +40,6 @@ public class FlowWriteConfig {
*/
public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
@@ -49,9 +47,17 @@ public class FlowWriteConfig {
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
/**
* http
/***
* Redis
*/
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
public static final String REDIS_IP = "redis.ip";
public static final String REDIS_PORT = "redis.port";
public static final String REDIS_TIMEOUT = "redis.timeout";
public static final String REDIS_POOL_MAXACTIVE = "redis.pool.maxActive";
public static final String REDIS_POOL_MAXIDLE = "redis.pool.maxIdle";
public static final String REDIS_POOL_MAXWAIT = "redis.pool.maxWait";
public static final String REDIS_POOL_TESTONBORROW = "redis.pool.testOnBorrow";
public static final String REDIS_POOL_TESTONRETURN = "redis.pool.testOnReturn";
}

View File

@@ -37,7 +37,6 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;

View File

@@ -1,13 +1,9 @@
package cn.ac.iie.topology;
import cn.ac.iie.bolt.ConnCompletionBolt;
import cn.ac.iie.bolt.NtcLogSendBolt;
import cn.ac.iie.bolt.collect.CollectCompletedBolt;
import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
import cn.ac.iie.bolt.security.SecurityCompletionBolt;
import cn.ac.iie.bolt.proxy.ProxyCompletionBolt;
import cn.ac.iie.bolt.SummaryBolt;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.log4j.Logger;
@@ -62,28 +58,9 @@ public class LogFlowWriteTopology {
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
switch (FlowWriteConfig.KAFKA_TOPIC) {
case "PROXY-EVENT-LOG":
builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt");
break;
case "RADIUS-RECORD-LOG":
builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
break;
case "CONNECTION-RECORD-LOG":
builder.setBolt("CollectCompletedBolt", new CollectCompletedBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("CollectCompletedBolt");
break;
case "SECURITY-EVENT-LOG":
builder.setBolt("SecurityCompletionBolt", new SecurityCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt");
break;
default:
}
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
}
public static void main(String[] args) throws Exception {

View File

@@ -1,18 +1,19 @@
package cn.ac.iie.utils.general;
import cn.ac.iie.bean.SessionRecordLog;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.json.JsonParseUtil;
import cn.ac.iie.utils.redis.RedisPollUtils;
import cn.ac.iie.utils.system.SnowflakeId;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.net.InternetDomainName;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@@ -21,179 +22,102 @@ import java.util.*;
* @author qidaijie
* @create 2018-08-13 15:11
*/
public class TransFormUtils {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
//在内存中加载反射类用的map
private static HashMap<String, Class> map = JsonParseUtil.getMapFromhttp(FlowWriteConfig.SCHEMA_HTTP);
//反射成一个类
private static Object mapObject = JsonParseUtil.generateObject(map);
//获取任务列表
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
//补全工具类
private static FormatUtils build = new FormatUtils.Builder(false).build();
//IP定位库工具类
private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
// private static SnowflakeId snowflakeId = new SnowflakeId();
/**
* 解析日志,并补全
*
* @param message kafka Topic原始日志
* @param message 原始日志
* @return 补全后的日志
*/
public static String dealCommonMessage(String message) {
Object object = JSONObject.parseObject(message, mapObject.getClass());
// System.out.println("补全之前 ===》 "+JSON.toJSONString(object));
public static String getJsonMessage(String message) {
SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class);
String serverIp = sessionRecordLog.getServer_ip();
String clientIp = sessionRecordLog.getClient_ip();
try {
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(object, strings[1]);
//匹配操作函数的字段
String function=strings[2];
//额外的参数的值
Object param = null;
if (strings[3] != null){
param=JsonParseUtil.getValue(object, strings[3]);
}
if (function.equals("current_timestamp")) {
JsonParseUtil.setValue(object, strings[1], getCurrentTime());
} else if (function.equals("snowflake_id")) {
JsonParseUtil.setValue(object, strings[1], build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS,FlowWriteConfig.KAFKA_TOPIC,FlowWriteConfig.DATA_CENTER_ID_NUM));
} else if (function.equals("geo_ip_detail")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpDetail(name.toString()));
} else if (function.equals("geo_asn")) {
JsonParseUtil.setValue(object, strings[1], getGeoAsn(name.toString()));
} else if (function.equals("radius_match")) {
JsonParseUtil.setValue(object, strings[1], radiusMatch(name.toString()));
} else if (function.equals("geo_ip_country")) {
JsonParseUtil.setValue(object, strings[1], getGeoIpCountry(name.toString()));
} else if (function.equals("decode_of_base64") && param != null){
JsonParseUtil.setValue(object, strings[1], FormatUtils.base64Str(name.toString(),param.toString()));
} else if (name.equals("http_host") && function.equals("sub_domain")) {
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, strings[1], getTopDomain(null, name.toString()));
}
} else if (name.equals("ssl_sni") && strings[2].equals("sub_domain")) {
if (appendTo == null || StringUtil.isBlank(appendTo.toString())) {
JsonParseUtil.setValue(object, strings[1], getTopDomain(name.toString(), null));
}
}
}
return JSONObject.toJSONString(object);
// System.out.println("补全之后 ===》 "+JSON.toJSONString(object));
sessionRecordLog.setUid(SnowflakeId.generateId());
sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp));
sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp));
sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true));
sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true));
sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost()));
sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000);
// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp));
return JSONObject.toJSONString(sessionRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
e.printStackTrace();
logger.error("日志解析过程出现异常", e);
return "";
}
}
// @Test
// public void aaa() {
// String sni = "www.baidu.com";
// System.out.println(getTopDomain(sni, null));
// System.out.println(getTopDomain(null,sni));
//
// }
/**
* 有sni通过sni获取域名有host根据host获取域名
* 有sni通过sni获取域名有hots根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
private static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(host)) {
return getDomainName(host);
} else if (StringUtil.isNotBlank(sni)) {
return getDomainName(sni);
if (StringUtil.isNotBlank(sni)) {
return getDomain(sni);
} else if (StringUtil.isNotBlank(host)) {
return getDomain(host);
} else {
return "";
}
}
/**
* 获取用户名
*
* @param key Sip
* @return SubscribeId
*/
private static String getSubscribeId(String key) {
String sub = "";
try (Jedis jedis = RedisPollUtils.getJedis()) {
if (jedis != null) {
sub = jedis.get(key);
}
} catch (Exception e) {
logger.error("通过Redis获取用户名出现异常", e);
}
return sub;
}
/**
* 根据url截取顶级域名
*
* @param host 网站url
* @param url 网站url
* @return 顶级域名
*/
private static String getDomainName(String host) {
String domain = "";
private static String getDomain(String url) {
try {
domain = InternetDomainName.from(host).topPrivateDomain().toString();
Matcher matcher = WEB_PATTERN.matcher(url);
if (matcher.find()) {
return matcher.group();
}
} catch (Exception e) {
logger.error("host解析顶级域名异常: " + e.getMessage());
e.printStackTrace();
}
return domain;
return "";
}
/**
* 生成当前时间戳的操作
*/
private static int getCurrentTime() {
return (int)(System.currentTimeMillis() / 1000);
public static void main(String[] args) {
String s = ipLookup.countryLookup("192.168.10.207");
System.out.println(s);
}
/**
* 根据clientIp获取location信息
*
* @param ip
* @return
*/
private static String getGeoIpDetail(String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip
* @return
*/
private static String getGeoAsn(String ip) {
return ipLookup.asnLookup(ip, true);
}
/**
* 根据ip获取country信息
*
* @param ip
* @return
*/
private static String getGeoIpCountry(String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助hbase补齐
*
* @param ip
* @return
*/
private static String radiusMatch(String ip) {
return HBaseUtils.getAccount(ip);
}
}
}

View File

@@ -1,138 +0,0 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* HBase 工具类
*
* @author qidaijie
*/
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String, String> subIdMap = new HashMap<>(333334);
// private static Map<String, String> subIdMap = new ConcurrentSkipListMap<>();
private static Connection connection;
private static Long time;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
try {
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
getAll();
} catch (IOException e) {
logger.error("获取HBase连接失败");
e.printStackTrace();
}
}
/**
* 更新变量
*/
public static void change() {
Long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500);
}
/**
* 获取变更内容
*
* @param startTime 开始时间
* @param endTime 结束时间
*/
private static void timestampsFilter(Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String key = Bytes.toString(CellUtil.cloneRow(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
if (subIdMap.containsKey(key)) {
if (!value.equals(subIdMap.get(key))) {
subIdMap.put(key, value);
}
} else {
subIdMap.put(key, value);
}
}
}
Long end = System.currentTimeMillis();
logger.warn("当前集合长度" + subIdMap.keySet().size());
logger.warn("更新缓存耗时:" + (end - begin) + "开始时间:" + begin + "结束时间:" + end);
time = endTime;
} catch (IOException e) {
e.printStackTrace();
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 获取所有的 key value
*/
private static void getAll() {
Long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
logger.warn("获取全量后集合长度:" + subIdMap.size());
logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取 account
*
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String clientIp) {
return subIdMap.get(clientIp);
}
}

View File

@@ -1,51 +0,0 @@
package cn.ac.iie.utils.http;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*/
public class HttpClientUtil {
public static String requestByGetMethod(String s) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
try {
HttpGet get = new HttpGet(s);
CloseableHttpResponse httpResponse = null;
httpResponse = httpClient.execute(get);
try {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
}
} finally {
httpResponse.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return entityStringBuilder.toString();
}
}

View File

@@ -45,22 +45,6 @@ public class InfluxDbUtils {
}
}
/**
* 记录对准失败次数-即内存中没有对应的key
*
* @param failure 对准失败量
*/
public static void sendHBaseFailure(int failure) {
if (failure != 0) {
InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
Point point1 = Point.measurement("sendHBaseFailure")
.tag("topic", FlowWriteConfig.KAFKA_TOPIC)
.field("failure", failure)
.build();
client.write("BusinessMonitor", "", point1);
}
}
/**
* 获取本机IP
*

View File

@@ -1,231 +0,0 @@
package cn.ac.iie.utils.json;
import cn.ac.iie.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import java.util.*;
/**
* 使用fastjson解析json的工具类
*/
public class JsonParseUtil {
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*
* @param type
* @return
*/
public static Class getClassName(String type) {
Class clazz;
switch (type) {
case "int":
clazz = Integer.class;
break;
case "String":
clazz = String.class;
break;
case "long":
clazz = long.class;
break;
case "Integer":
clazz = Integer.class;
break;
case "double":
clazz = double.class;
break;
case "float":
clazz = float.class;
break;
case "char":
clazz = char.class;
break;
case "byte":
clazz = byte.class;
break;
case "boolean":
clazz = boolean.class;
break;
case "short":
clazz = short.class;
break;
default:
clazz = String.class;
}
return clazz;
}
/**
* 根据反射生成对象的方法
*
* @param properties
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Iterator i = keySet.iterator(); i.hasNext(); ) {
String key = (String) i.next();
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 获取属性值的方法
*
* @param obj
* @param property
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
}
/**
* 更新属性值的方法
*
* @param obj
* @param property
* @param value
*/
public static void setValue(Object obj, String property, Object value) {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
*
* @param http
* @return 用于反射生成schema类型的对象的一个map集合
*/
public static HashMap<String, Class> getMapFromhttp(String http) {
HashMap<String, Class> map = new HashMap<>();
String schema = HttpClientUtil.requestByGetMethod(http);
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String name = JSON.parseObject(field.toString()).get("name").toString();
String type = JSON.parseObject(field.toString()).get("type").toString();
// if(
// name.equals("dns_qr") ||
// name.equals("dns_opcode") ||
// name.equals("ssl_pinningst") ||
// name.equals("ssl_intercept_state") ||
// name.equals("ssl_cert_verify")
//
// ){
// type="Integer";
// }
//组合用来生成实体类的map
map.put(name, getClassName(type));
}
return map;
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList)
*
* @param http
* @return
*/
public static ArrayList<String[]> getJobListFromHttp(String http) {
ArrayList<String[]> list = new ArrayList<>();
String schema = HttpClientUtil.requestByGetMethod(http);
//解析data
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
Object doc = JSON.parseObject(field.toString()).get("doc");
String name = JSON.parseObject(field.toString()).get("name").toString();
if (doc != null) {
Object format = JSON.parseObject(doc.toString()).get("format");
if (format != null) {
String functions = null;
String appendTo = null;
String params = null;
Object functionsObj = JSON.parseObject(format.toString()).get("functions");
Object appendToObj = JSON.parseObject(format.toString()).get("appendTo");
Object paramObj = JSON.parseObject(format.toString()).get("param");
if (functionsObj != null) {
functions = functionsObj.toString();
}
if (appendToObj != null) {
appendTo = appendToObj.toString();
}
if (paramObj != null) {
params = paramObj.toString();
}
if (appendTo != null && params == null) {
String[] functionArray = functions.split(",");
String[] appendToArray = appendTo.split(",");
for (int i = 0; i < functionArray.length; i++) {
// useList.add(name);
// toList.add(appendToArray[i]);
// funcList.add(functionArray[i]);
list.add(new String[]{name, appendToArray[i], functionArray[i],null});
}
}else if (appendTo != null && params != null){
String[] functionArray = functions.split(",");
String[] appendToArray = appendTo.split(",");
String[] paramArray = params.split(",");
for (int i = 0; i < functionArray.length; i++) {
// useList.add(name);
// toList.add(appendToArray[i]);
// funcList.add(functionArray[i]);
list.add(new String[]{name, appendToArray[i], functionArray[i],paramArray[i]});
}
}
else {
// useList.add(name);
// funcList.add(functions.toString());
// toList.add(name);
list.add(new String[]{name, name, functions,params});
}
}
}
}
return list;
}
}

View File

@@ -59,7 +59,7 @@ public class KafkaLogNtc {
}
}
kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
logger.warn("Log sent to National Center successfully!!!!!");
}
/**
@@ -72,10 +72,10 @@ public class KafkaLogNtc {
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("linger.ms", "2");
properties.put("request.timeout.ms", 30000);
properties.put("request.timeout.ms", 60000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
// properties.put("compression.type", "snappy");
properties.put("compression.type", "snappy");
kafkaProducer = new KafkaProducer<>(properties);
}

View File

@@ -0,0 +1,79 @@
package cn.ac.iie.utils.redis;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.log4j.Logger;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
/**
* 预用于对准IP对应的用户名的 Redis连接池
*
* @author my
* @date 2018-07-04
*/
public final class RedisClusterUtils {
private static final Logger logger = Logger.getLogger(RedisClusterUtils.class);
private static JedisCluster jedisCluster;
private static Properties props = new Properties();
static {
try {
String redisConfigFile = "redis_config.properties";
props.load(RedisClusterUtils.class.getClassLoader().getResourceAsStream(redisConfigFile));
} catch (IOException e) {
props = null;
logger.error("加载Redis配置文件失败", e);
}
}
/**
* 不允许通过new创建该类的实例
*/
private RedisClusterUtils() {
}
/**
* 初始化Redis连接池
*/
private static JedisCluster getJedisCluster() {
if (jedisCluster == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE)));
poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE)));
poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT)));
poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN)));
poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW)));
Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>();
for (String port : props.getProperty(FlowWriteConfig.REDIS_PORT).split(FlowWriteConfig.SEGMENTATION)) {
for (String ip : props.getProperty(FlowWriteConfig.REDIS_IP).split(FlowWriteConfig.SEGMENTATION)) {
nodes.add(new HostAndPort(ip, Integer.parseInt(port)));
}
}
jedisCluster = new JedisCluster(nodes, poolConfig);
}
return jedisCluster;
}
/**
* 获取用户名
*
* @param key service_ip
* @return Subscribe_id
*/
public static String get(String key) {
String s = key.split("\\.")[0];
if (!FlowWriteConfig.CHECK_IP_SCOPE.contains(s)) {
jedisCluster = getJedisCluster();
return jedisCluster.get(key);
}
return "";
}
}

View File

@@ -0,0 +1,115 @@
package cn.ac.iie.utils.redis;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Properties;
/**
* @author qidaijie
*/
public class RedisPollUtils {
private static final Logger logger = Logger.getLogger(RedisPollUtils.class);
private static JedisPool jedisPool = null;
private static Properties props = new Properties();
private RedisPollUtils() {
}
static {
initialPool();
}
/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
//加载连接池配置文件
props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream("redis_config.properties"));
// 创建jedis池配置实例
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE)));
poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE)));
poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT)));
poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN)));
poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW)));
// 根据配置实例化jedis池
jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP),
Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT)));
} catch (Exception e) {
logger.error("Redis连接池初始化错误", e);
}
}
/**
* 获取Jedis实例
*
* @return Jedis实例
*/
public static Jedis getJedis() {
Jedis jedis = null;
try {
if (jedisPool == null) {
initialPool();
}
jedis = jedisPool.getResource();
} catch (Exception e) {
logger.error("Redis连接池错误,无法获取连接", e);
}
return jedis;
}
// /**
// * @param key redis key
// * @return value
// */
// public static Integer getWorkerId(String key) {
// int workId = 0;
// int maxId = 32;
// try (Jedis jedis = RedisPollUtils.getJedis()) {
// if (jedis != null) {
// String work = jedis.get(key);
// if (StringUtil.isBlank(work)) {
// jedis.set(key, "0");
// } else {
// workId = Integer.parseInt(work);
// }
// if (workId < maxId) {
// jedis.set(key, String.valueOf(workId + 1));
// } else {
// workId = 0;
// jedis.set(key, "1");
// }
// }
// } catch (Exception e) {
// logger.error("通过Redis获取用户名出现异常", e);
// workId = RandomUtils.nextInt(0, 31);
// }
// return workId;
// }
public static Integer getWorkerId(String key) {
int workId = 0;
try (Jedis jedis = RedisPollUtils.getJedis()) {
if (jedis != null) {
workId = Integer.parseInt(jedis.get(key));
jedis.set(key, String.valueOf(workId + 2));
logger.error("\n工作id是" + workId + "\n");
}
} catch (Exception e) {
logger.error("通过Redis获取用户名出现异常", e);
workId = RandomUtils.nextInt(0, 31);
}
return workId;
}
}

View File

@@ -1,56 +0,0 @@
package cn.ac.iie.utils.system;
/**
* IP工具类
*
* @author qidaijie
*/
public class IpUtils {
/**
* IPV4 正则
*/
private static final String IPV4 = "^((\\d|[1-9]\\d|1\\d\\d|2([0-4]\\d|5[0-5]))\\.){4}$";
/**
* IPV6正则
*/
private static final String IPV6 = "^(([\\da-fA-F]{1,4}):){8}$";
/**
* 判断IP类型 v4 or v6
*
* @param ip IP
* @return 1:v4 2:v6 3:abnormal
*/
public static int validIPAddress(String ip) {
return String.format("%s.", ip).matches(IPV4) ? 1 : String.format("%s:", ip).matches(IPV6) ? 2 : 3;
}
/**
* ip字符串转整数
* ip是.分割的整数字符串,按照r进制转十进制的规律,按权相加求和,这里的权是256.
*
* @param ip IP
* @return ip(int)
*/
public static int ipChangeInt(String ip) {
//分割ip
String[] ipSplit = ip.split("\\.");
int result = 0;
for (int i = 0; i < 4; i++) {
Integer ipSubInteger = Integer.parseInt(ipSplit[i]);
//正则验证不能为负数
if (ipSubInteger > 255) {
result = 0;
break;
}
result += (ipSubInteger << (24 - i * 8));
}
return result;
}
public static void main(String[] args) {
System.out.println(validIPAddress("192.254.254.254"));
System.out.println(ipChangeInt("254.254.254.254"));
}
}

View File

@@ -0,0 +1,190 @@
package cn.ac.iie.utils.system;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import org.apache.log4j.Logger;
/**
* 雪花算法
*
* @author qidaijie
*/
public class SnowflakeId {
private static Logger logger = Logger.getLogger(SnowflakeId.class);
// ==============================Fields===========================================
/**
* 开始时间截 (2018-08-01 00:00:00) max 17years
*/
private final long twepoch = 1564588800000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 6L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 4L;
/**
* 支持的最大机器id结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是15
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 14L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(14+6)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(4+6+14)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为16383
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~63)
*/
private long workerId;
/**
* 数据中心ID(0~15)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~16383)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId();
}
//==============================Constructors=====================================
/**
* 构造函数
*/
private SnowflakeId() {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterId;
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
private synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId() {
return idWorker.nextId();
}
}

View File

@@ -1,5 +1,7 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.SnowflakeId;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -13,7 +15,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author qidaijie
*/
public class DistributedLock implements Lock, Watcher {
private static Logger logger = Logger.getLogger(DistributedLock.class);
@@ -79,7 +83,7 @@ public class DistributedLock implements Lock, Watcher {
}
try {
if (this.tryLock()) {
logger.warn(Thread.currentThread().getName() + " " + lockName + " is being locked......");
System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁");
} else {
// 等待锁
waitForLock(waitLock, sessionTimeout);
@@ -94,7 +98,7 @@ public class DistributedLock implements Lock, Watcher {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new LockException("locked name is error!!!");
throw new LockException("锁名有误");
}
// 创建临时有序节点
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
@@ -183,4 +187,33 @@ public class DistributedLock implements Lock, Watcher {
super(e);
}
}
public static void main(String[] args) {
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
Runnable runnable = new Runnable() {
@Override
public void run() {
DistributedLock lock = null;
try {
lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
// System.out.println(SnowflakeId.generateId());
System.out.println(1);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}

View File

@@ -0,0 +1,134 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private ZooKeeper zookeeper;
private static final int SESSION_TIME_OUT = 20000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
/**
* 修改节点信息
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
try {
connectZookeeper();
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
String result = String.valueOf(workerId + 1);
if (stat != null) {
zookeeper.setData(path, result.getBytes(), stat.getVersion());
} else {
logger.error("Node does not exist!,Can't modify");
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
} finally {
closeConn();
}
logger.error("工作ID是" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
*/
private void connectZookeeper() {
try {
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
private void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取节点内容
*
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
byte[] resByte = zookeeper.getData(path, true, stat);
result = new String(resByte);
} catch (KeeperException | InterruptedException e) {
logger.error("Get node information exception");
e.printStackTrace();
}
return result;
}
/**
* @param path 节点创建的路径
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
try {
connectZookeeper();
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
} finally {
closeConn();
}
}
}

View File

@@ -12,7 +12,7 @@ log4j.appender.file.Threshold=error
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=storm-topology.log
log4j.appender.file.file=galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n