提交线上使用版本

This commit is contained in:
qidaijie
2020-12-25 17:32:54 +08:00
parent 5438144b07
commit e6c602a154
43 changed files with 1446 additions and 2299 deletions

View File

@@ -1,672 +0,0 @@
package cn.ac.iie.bean;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter;
/**
* @author qidaijie
*/
public class SessionRecordLog {
private long uid;
private int policy_id;
private long action;
private int start_time;
private int end_time;
private long recv_time;
private String trans_proto;
private String app_proto;
private int addr_type;
private String server_ip;
private String client_ip;
private int server_port;
private int client_port;
private int service;
private int entrance_id;
private int device_id;
private int Link_id;
private String isp;
private int encap_type;
private int direction;
private int stream_dir;
private String cap_ip;
private String addr_list;
private String server_location;
private String client_location;
private String client_asn;
private String server_asn;
private String subscribe_id;
private long con_duration_ms;
private String url;
private String host;
private String domain;
private String category;
private String req_line;
private String res_line;
private String cookie;
private String referer;
private String user_agent;
private String content_len;
private String content_type;
private String set_cookie;
private String req_header;
private String resp_header;
private String req_body_key;
private String req_body;
private String res_body_key;
private String resp_body;
private String version;
private String sni;
private String san;
private String cn;
private int app_id;
private int protocol_id;
private long con_latency_ms;
private int pinningst;
private int intercept_state;
private long ssl_server_side_latency;
private long ssl_client_side_latency;
private String ssl_server_side_version;
private String ssl_client_side_version;
private int ssl_cert_verify;
private String stream_trace_id;
private String ssl_error;
private long c2s_pkt_num;
private long S2c_pkt_num;
private long c2s_byte_num;
private long s2c_byte_num;
private String nas_ip;
private String framed_ip;
private String account;
private int packet_type;
private int has_dup_traffic;
private String stream_error;
public SessionRecordLog() {
}
public long getUid() {
return uid;
}
public void setUid(long uid) {
this.uid = uid;
}
public int getPolicy_id() {
return policy_id;
}
public void setPolicy_id(int policy_id) {
this.policy_id = policy_id;
}
public long getAction() {
return action;
}
public void setAction(long action) {
this.action = action;
}
public int getStart_time() {
return start_time;
}
public void setStart_time(int start_time) {
this.start_time = start_time;
}
public int getEnd_time() {
return end_time;
}
public void setEnd_time(int end_time) {
this.end_time = end_time;
}
public String getSsl_error() {
return ssl_error;
}
public void setSsl_error(String ssl_error) {
this.ssl_error = ssl_error;
}
public String getApp_proto() {
return app_proto;
}
public void setApp_proto(String app_proto) {
this.app_proto = app_proto;
}
public long getRecv_time() {
return recv_time;
}
public void setRecv_time(long recv_time) {
this.recv_time = recv_time;
}
public String getTrans_proto() {
return trans_proto;
}
public void setTrans_proto(String trans_proto) {
this.trans_proto = trans_proto;
}
public int getAddr_type() {
return addr_type;
}
public void setAddr_type(int addr_type) {
this.addr_type = addr_type;
}
public String getServer_ip() {
return server_ip;
}
public void setServer_ip(String server_ip) {
this.server_ip = server_ip;
}
public String getClient_ip() {
return client_ip;
}
public void setClient_ip(String client_ip) {
this.client_ip = client_ip;
}
public int getServer_port() {
return server_port;
}
public void setServer_port(int server_port) {
this.server_port = server_port;
}
public int getClient_port() {
return client_port;
}
public void setClient_port(int client_port) {
this.client_port = client_port;
}
public int getService() {
return service;
}
public void setService(int service) {
this.service = service;
}
public int getEntrance_id() {
return entrance_id;
}
public void setEntrance_id(int entrance_id) {
this.entrance_id = entrance_id;
}
public int getDevice_id() {
return device_id;
}
public void setDevice_id(int device_id) {
this.device_id = device_id;
}
public int getLink_id() {
return Link_id;
}
public void setLink_id(int link_id) {
Link_id = link_id;
}
public String getIsp() {
return isp;
}
public void setIsp(String isp) {
this.isp = isp;
}
public int getEncap_type() {
return encap_type;
}
public void setEncap_type(int encap_type) {
this.encap_type = encap_type;
}
public int getDirection() {
return direction;
}
public void setDirection(int direction) {
this.direction = direction;
}
public int getStream_dir() {
return stream_dir;
}
public void setStream_dir(int stream_dir) {
this.stream_dir = stream_dir;
}
public String getCap_ip() {
return cap_ip;
}
public void setCap_ip(String cap_ip) {
this.cap_ip = cap_ip;
}
public String getAddr_list() {
return addr_list;
}
public void setAddr_list(String addr_list) {
this.addr_list = addr_list;
}
public String getServer_location() {
return server_location;
}
public void setServer_location(String server_location) {
this.server_location = server_location;
}
public String getClient_location() {
return client_location;
}
public void setClient_location(String client_location) {
this.client_location = client_location;
}
public String getClient_asn() {
return client_asn;
}
public void setClient_asn(String client_asn) {
this.client_asn = client_asn;
}
public String getServer_asn() {
return server_asn;
}
public void setServer_asn(String server_asn) {
this.server_asn = server_asn;
}
public String getSubscribe_id() {
return subscribe_id;
}
public void setSubscribe_id(String subscribe_id) {
this.subscribe_id = subscribe_id;
}
public long getCon_duration_ms() {
return con_duration_ms;
}
public void setCon_duration_ms(long con_duration_ms) {
this.con_duration_ms = con_duration_ms;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getReq_line() {
return req_line;
}
public void setReq_line(String req_line) {
this.req_line = req_line;
}
public String getRes_line() {
return res_line;
}
public void setRes_line(String res_line) {
this.res_line = res_line;
}
public String getCookie() {
return cookie;
}
public void setCookie(String cookie) {
this.cookie = cookie;
}
public String getReferer() {
return referer;
}
public void setReferer(String referer) {
this.referer = referer;
}
public String getUser_agent() {
return user_agent;
}
public void setUser_agent(String user_agent) {
this.user_agent = user_agent;
}
public String getContent_len() {
return content_len;
}
public void setContent_len(String content_len) {
this.content_len = content_len;
}
public String getContent_type() {
return content_type;
}
public void setContent_type(String content_type) {
this.content_type = content_type;
}
public String getSet_cookie() {
return set_cookie;
}
public void setSet_cookie(String set_cookie) {
this.set_cookie = set_cookie;
}
public String getReq_header() {
return req_header;
}
public void setReq_header(String req_header) {
this.req_header = req_header;
}
public String getResp_header() {
return resp_header;
}
public void setResp_header(String resp_header) {
this.resp_header = resp_header;
}
public String getReq_body_key() {
return req_body_key;
}
public void setReq_body_key(String req_body_key) {
this.req_body_key = req_body_key;
}
public String getReq_body() {
return req_body;
}
public void setReq_body(String req_body) {
this.req_body = req_body;
}
public String getRes_body_key() {
return res_body_key;
}
public void setRes_body_key(String res_body_key) {
this.res_body_key = res_body_key;
}
public String getResp_body() {
return resp_body;
}
public void setResp_body(String resp_body) {
this.resp_body = resp_body;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getSni() {
return sni;
}
public void setSni(String sni) {
this.sni = sni;
}
public String getSan() {
return san;
}
public void setSan(String san) {
this.san = san;
}
public String getCn() {
return cn;
}
public void setCn(String cn) {
this.cn = cn;
}
public int getApp_id() {
return app_id;
}
public void setApp_id(int app_id) {
this.app_id = app_id;
}
public int getProtocol_id() {
return protocol_id;
}
public void setProtocol_id(int protocol_id) {
this.protocol_id = protocol_id;
}
public int getIntercept_state() {
return intercept_state;
}
public void setIntercept_state(int intercept_state) {
this.intercept_state = intercept_state;
}
public long getSsl_server_side_latency() {
return ssl_server_side_latency;
}
public void setSsl_server_side_latency(long ssl_server_side_latency) {
this.ssl_server_side_latency = ssl_server_side_latency;
}
public long getSsl_client_side_latency() {
return ssl_client_side_latency;
}
public void setSsl_client_side_latency(long ssl_client_side_latency) {
this.ssl_client_side_latency = ssl_client_side_latency;
}
public String getSsl_server_side_version() {
return ssl_server_side_version;
}
public void setSsl_server_side_version(String ssl_server_side_version) {
this.ssl_server_side_version = ssl_server_side_version;
}
public String getSsl_client_side_version() {
return ssl_client_side_version;
}
public void setSsl_client_side_version(String ssl_client_side_version) {
this.ssl_client_side_version = ssl_client_side_version;
}
public int getSsl_cert_verify() {
return ssl_cert_verify;
}
public void setSsl_cert_verify(int ssl_cert_verify) {
this.ssl_cert_verify = ssl_cert_verify;
}
public String getStream_trace_id() {
return stream_trace_id;
}
public void setStream_trace_id(String stream_trace_id) {
this.stream_trace_id = stream_trace_id;
}
public long getCon_latency_ms() {
return con_latency_ms;
}
public void setCon_latency_ms(long con_latency_ms) {
this.con_latency_ms = con_latency_ms;
}
public int getPinningst() {
return pinningst;
}
public void setPinningst(int pinningst) {
this.pinningst = pinningst;
}
public long getC2s_pkt_num() {
return c2s_pkt_num;
}
public void setC2s_pkt_num(long c2s_pkt_num) {
this.c2s_pkt_num = c2s_pkt_num;
}
public long getS2c_pkt_num() {
return S2c_pkt_num;
}
public void setS2c_pkt_num(long s2c_pkt_num) {
S2c_pkt_num = s2c_pkt_num;
}
public long getC2s_byte_num() {
return c2s_byte_num;
}
public void setC2s_byte_num(long c2s_byte_num) {
this.c2s_byte_num = c2s_byte_num;
}
public long getS2c_byte_num() {
return s2c_byte_num;
}
public void setS2c_byte_num(long s2c_byte_num) {
this.s2c_byte_num = s2c_byte_num;
}
public String getNas_ip() {
return nas_ip;
}
public void setNas_ip(String nas_ip) {
this.nas_ip = nas_ip;
}
public String getFramed_ip() {
return framed_ip;
}
public void setFramed_ip(String framed_ip) {
this.framed_ip = framed_ip;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public int getPacket_type() {
return packet_type;
}
public void setPacket_type(int packet_type) {
this.packet_type = packet_type;
}
public int getHas_dup_traffic() {
return has_dup_traffic;
}
public void setHas_dup_traffic(int has_dup_traffic) {
this.has_dup_traffic = has_dup_traffic;
}
public String getStream_error() {
return stream_error;
}
public void setStream_error(String stream_error) {
this.stream_error = stream_error;
}
}

View File

@@ -0,0 +1,72 @@
package cn.ac.iie.bolt;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.general.SnowflakeId;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.system.TupleUtils;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.dealCommonMessage;
/**
* @author qidaijie
*/
public class CompletionBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(CompletionBolt.class);
private static final long serialVersionUID = 9006119186526123734L;
private static final String IS = "yes";
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
// if (TupleUtils.isTick(tuple)) {
// if (IS.equals(FlowWriteConfig.NEED_COMPLETE_SUBID)) {
// HBaseUtils.change();
// }
// } else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(dealCommonMessage(message)));
}
// }
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
// @Override
// public Map<String, Object> getComponentConfiguration() {
// Map<String, Object> conf = new HashMap<String, Object>(16);
// conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
// FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
// return conf;
// }
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tsgLog"));
}
}

View File

@@ -1,48 +0,0 @@
package cn.ac.iie.bolt;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
import static cn.ac.iie.utils.general.TransFormUtils.getJsonMessage;
/**
* 通联关系日志补全
*
* @author qidaijie
*/
public class ConnCompletionBolt extends BaseBasicBolt {
private static final long serialVersionUID = -1059151670138465894L;
private final static Logger logger = Logger.getLogger(ConnCompletionBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(getJsonMessage(message)));
}
} catch (Exception e) {
logger.error("接收解析过程出现异常", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("connLog"));
}
}

View File

@@ -1,65 +0,0 @@
package cn.ac.iie.bolt;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
import cn.ac.iie.utils.system.TupleUtils;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import java.util.HashMap;
import java.util.Map;
/**
* 统计总数bolt用于将统计后的数入influxDB
*
* @author antlee
* @date 2018/8/14
*/
public class SummaryBolt extends BaseBasicBolt {
private static final long serialVersionUID = 4614020687381536301L;
private static Logger logger = Logger.getLogger(SummaryBolt.class);
private static long sum = 0L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
if (TupleUtils.isTick(tuple)) {
InfluxDbUtils.sendKafkaSuccess(sum);
sum = 0L;
} else {
long successfulSum = tuple.getLong(0);
sum += successfulSum;
}
} catch (Exception e) {
logger.error("计数写入influxDB出现异常 ", e);
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
return conf;
}
}

View File

@@ -1,18 +1,15 @@
package cn.ac.iie.bolt;
package cn.ac.iie.bolt.kafka;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
import cn.ac.iie.utils.kafka.KafkaLogSend;
import cn.ac.iie.utils.system.TupleUtils;
import cn.ac.iie.utils.kafka.KafkaLogNtc;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.LinkedList;
@@ -23,18 +20,17 @@ import java.util.Map;
* @author qidaijie
* @date 2018/8/14
*/
public class NtcLogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3940515789830317517L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
public class LogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3663610927224396615L;
private static Logger logger = Logger.getLogger(LogSendBolt.class);
private List<String> list;
private KafkaLogNtc kafkaLogNtc;
private static long successfulSum = 0;
private KafkaLogSend kafkaLogSend;
@Override
public void prepare(Map stormConf, TopologyContext context) {
list = new LinkedList<>();
kafkaLogNtc = KafkaLogNtc.getInstance();
kafkaLogSend = KafkaLogSend.getInstance();
}
@Override
@@ -42,25 +38,21 @@ public class NtcLogSendBolt extends BaseBasicBolt {
try {
if (TupleUtils.isTick(tuple)) {
if (list.size() != 0) {
kafkaLogNtc.sendMessage(list);
successfulSum += list.size();
kafkaLogSend.sendMessage(list);
list.clear();
}
basicOutputCollector.emit(new Values(successfulSum));
successfulSum = 0L;
} else {
String message = tuple.getString(0);
String message = tuple.getValue(0).toString();
if (StringUtil.isNotBlank(message)) {
list.add(message);
}
if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
kafkaLogNtc.sendMessage(list);
successfulSum += list.size();
kafkaLogSend.sendMessage(list);
list.clear();
}
}
} catch (Exception e) {
logger.error("日志发送Kafka过程出现异常 ", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@@ -74,7 +66,6 @@ public class NtcLogSendBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("suc"));
}
}

View File

@@ -8,25 +8,29 @@ import cn.ac.iie.utils.system.FlowWriteConfigurations;
*/
public class FlowWriteConfig {
public static final String LOG_STRING_SPLITTER = "\t";
public static final String SQL_STRING_SPLITTER = "#";
public static final String SEGMENTATION = ",";
public static final int IF_PARAM_LENGTH = 3;
public static final String VISIBILITY = "disabled";
public static final String FORMAT_SPLITTER = ",";
public static final String IS_JSON_KEY_TAG = "$.";
public static final String IF_CONDITION_SPLITTER = "=";
/**
* System
*/
public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism");
public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
public static final Integer COMPLETION_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "completion.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final String CHECK_IP_SCOPE = FlowWriteConfigurations.getStringProperty(0, "check.ip.scope");
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final String NEED_COMPLETE_SUBID = FlowWriteConfigurations.getStringProperty(0, "need.complete.subid");
public static final String LOG_NEED_COMPLETE = FlowWriteConfigurations.getStringProperty(0, "log.need.complete");
/**
* influxDB
@@ -38,26 +42,30 @@ public class FlowWriteConfig {
/**
* kafka
*/
public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
/***
* Redis
/**
* kafka限流配置-20201117
*/
public static final String REDIS_IP = "redis.ip";
public static final String REDIS_PORT = "redis.port";
public static final String REDIS_TIMEOUT = "redis.timeout";
public static final String REDIS_POOL_MAXACTIVE = "redis.pool.maxActive";
public static final String REDIS_POOL_MAXIDLE = "redis.pool.maxIdle";
public static final String REDIS_POOL_MAXWAIT = "redis.pool.maxWait";
public static final String REDIS_POOL_TESTONBORROW = "redis.pool.testOnBorrow";
public static final String REDIS_POOL_TESTONRETURN = "redis.pool.testOnReturn";
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
/**
* http
*/
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
}

View File

@@ -0,0 +1,20 @@
package cn.ac.iie.common;
import cn.ac.iie.utils.system.FlowWriteConfigurations;
/**
* @author Administrator
*/
public class KafkaProConfig {
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
}

View File

@@ -1,6 +1,7 @@
package cn.ac.iie.spout;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -31,14 +32,45 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS);
props.put("group.id", FlowWriteConfig.GROUP_ID);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
// switch (FlowWriteConfig.KAFKA_TOPIC) {
// case "PROXY-EVENT-LOG":
// props.put("client.id", "proxy");
// break;
// case "RADIUS-RECORD-LOG":
// props.put("client.id", "radius");
// break;
// case "CONNECTION-RECORD-LOG":
// props.put("client.id", "connection");
// break;
// case "SECURITY-EVENT-LOG":
// props.put("client.id", "security");
// break;
// case "CONNECTION-SKETCH":
// props.put("client.id", "sketch");
// break;
// case "ACTIVE-DEFENCE-EVENT-LOG":
// props.put("client.id", "active");
// break;
// case "SYS-PACKET-CAPTURE-LOG":
// props.put("client.id", "packet");
// break;
//
// default:
// }
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**
* kafka限流配置-20201117
*/
props.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID);
return props;
}

View File

@@ -1,9 +1,8 @@
package cn.ac.iie.topology;
import cn.ac.iie.bolt.ConnCompletionBolt;
import cn.ac.iie.bolt.NtcLogSendBolt;
import cn.ac.iie.bolt.SummaryBolt;
import cn.ac.iie.bolt.CompletionBolt;
import cn.ac.iie.bolt.kafka.LogSendBolt;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.log4j.Logger;
@@ -31,10 +30,10 @@ public class LogFlowWriteTopology {
private LogFlowWriteTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
topologyConfig = createTopologyConfig();
}
private Config createTopologConfig() {
private Config createTopologyConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
@@ -56,33 +55,40 @@ public class LogFlowWriteTopology {
}
private void buildTopology() {
String need = "yes";
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("CompletionLogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
} else {
builder.setBolt("LogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
}
}
public static void main(String[] args) throws Exception {
LogFlowWriteTopology csst = null;
LogFlowWriteTopology flowWriteTopology;
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
csst = new LogFlowWriteTopology(args[0]);
flowWriteTopology = new LogFlowWriteTopology(args[0]);
} else {
csst = new LogFlowWriteTopology();
flowWriteTopology = new LogFlowWriteTopology();
}
csst.buildTopology();
flowWriteTopology.buildTopology();
if (runLocally) {
logger.info("执行本地模式...");
csst.runLocally();
flowWriteTopology.runLocally();
} else {
logger.info("执行远程部署模式...");
csst.runRemotely();
flowWriteTopology.runRemotely();
}
}
}

View File

@@ -1,68 +0,0 @@
package cn.ac.iie.utils.general;
import org.apache.log4j.Logger;
import java.security.MessageDigest;
/**
* 描述:转换MD5工具类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class EncryptionUtils {
private static Logger logger = Logger.getLogger(EncryptionUtils.class);
public static String md5Encode(String msg) throws Exception {
try {
byte[] msgBytes = msg.getBytes("utf-8");
/*
* 声明使用Md5算法,获得MessaDigest对象
*/
MessageDigest md5 = MessageDigest.getInstance("MD5");
/*
* 使用指定的字节更新摘要
*/
md5.update(msgBytes);
/*
* 完成哈希计算,获得密文
*/
byte[] digest = md5.digest();
/*
* 以上两行代码等同于
* byte[] digest = md5.digest(msgBytes);
*/
return byteArr2hexString(digest);
} catch (Exception e) {
logger.error("Error in conversion MD5! " + msg);
return "";
}
}
/**
* 将byte数组转化为16进制字符串形式
*
* @param bys 字节数组
* @return 字符串
*/
private static String byteArr2hexString(byte[] bys) {
StringBuilder hexVal = new StringBuilder();
int val = 0;
for (byte by : bys) {
//将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算
val = ((int) by) & 0xff;
if (val < 16) {
hexVal.append("0");
}
hexVal.append(Integer.toHexString(val));
}
return hexVal.toString();
}
public static void main(String[] args) {
}
}

View File

@@ -1,8 +1,9 @@
package cn.ac.iie.utils.system;
package cn.ac.iie.utils.general;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import com.zdjizhi.utils.ZooKeeperLock;
import org.apache.log4j.Logger;
/**
@@ -13,11 +14,18 @@ import org.apache.log4j.Logger;
public class SnowflakeId {
private static Logger logger = Logger.getLogger(SnowflakeId.class);
// ==============================Fields===========================================
/**
* 开始时间截 (2018-08-01 00:00:00) max 17years
* 共64位 第一位为符号位 默认0
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :7位(0-127),
* workerId(关联进程):6(0-63) ,序列号11位(2047/ms)
*
* 序列号 /ms = (-1L ^ (-1L << 11))
* 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
*/
private final long twepoch = 1564588800000L;
/**
* 开始时间截 (2020-11-14 00:00:00) max 17years
*/
private final long twepoch = 1605283200000L;
/**
* 机器id所占的位数
@@ -27,22 +35,23 @@ public class SnowflakeId {
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 4L;
private final long dataCenterIdBits = 7L;
/**
* 支持的最大机器id结果是3 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* 支持的最大机器id结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
* M << n = M * 2^n
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是15
* 支持的最大数据标识id结果是127
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 14L;
private final long sequenceBits = 11L;
/**
* 机器ID向左移12位
@@ -60,7 +69,7 @@ public class SnowflakeId {
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码这里为16383
* 生成序列的掩码这里为2047
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
@@ -70,12 +79,12 @@ public class SnowflakeId {
private long workerId;
/**
* 数据中心ID(0~15)
* 数据中心ID(0~127)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~16383)
* 毫秒内序列(0~2047)
*/
private long sequence = 0L;
@@ -85,12 +94,18 @@ public class SnowflakeId {
private long lastTimestamp = -1L;
/**
* 设置允许时间回拨的最大限制10s
*/
private static final long rollBackTime = 10000L;
private static SnowflakeId idWorker;
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId();
idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
}
//==============================Constructors=====================================
@@ -98,19 +113,46 @@ public class SnowflakeId {
/**
* 构造函数
*/
private SnowflakeId() {
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
try {
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC);
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
int dataCenterId = FlowWriteConfig.DATA_CENTER_ID_NUM;
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
}
this.workerId = tmpWorkerId;
this.dataCenterId = dataCenterId;
this.dataCenterId = dataCenterIdNum;
} catch (Exception e) {
e.printStackTrace();
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
}finally {
lock.unlock();
}
// ZooKeeperLock lock = new ZooKeeperLock(zookeeperIp, "/locks", "disLocks1");
// if (lock.lock()) {
// int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
// if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
// throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
// }
// if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
// throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
// }
// this.workerId = tmpWorkerId;
// this.dataCenterId = dataCenterIdNum;
// try {
// lock.unlock();
// } catch (InterruptedException ie) {
// ie.printStackTrace();
// } catch (Exception e) {
// e.printStackTrace();
// logger.error("This is not usual error!!!===>>>" + e + "<<<===");
// }
// }
}
// ==============================Methods==========================================
@@ -122,7 +164,10 @@ public class SnowflakeId {
*/
private synchronized long nextId() {
long timestamp = timeGen();
//设置一个允许回拨限制时间系统时间回拨范围在rollBackTime内可以等待校准
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
timestamp = tilNextMillis(lastTimestamp);
}
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(

View File

@@ -1,123 +1,165 @@
package cn.ac.iie.utils.general;
import cn.ac.iie.bean.SessionRecordLog;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.redis.RedisPollUtils;
import cn.ac.iie.utils.system.SnowflakeId;
import cn.ac.iie.utils.zookeeper.DistributedLock;
import cn.ac.iie.utils.zookeeper.ZookeeperUtils;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.json.JsonParseUtil;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.Encodes;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.JsonMapper;
import org.apache.log4j.Logger;
import java.util.*;
import static cn.ac.iie.utils.general.TransFunction.*;
/**
* 描述:转换或补全工具类
*
* @author qidaijie
* @create 2018-08-13 15:11
*/
public class TransFormUtils {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
/**
* 在内存中加载反射类用的map
*/
private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 反射成一个类
*/
private static Object mapObject = JsonParseUtil.generateObject(map);
/**
* 获取任务列表
* list的每个元素是一个四元字符串数组 (有format标识的字段补全的字段用到的功能函数用到的参数),例如:
* (mail_subject mail_subject decode_of_base64 mail_subject_charset)
*/
private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
/**
* 补全工具类
*/
// private static FormatUtils build = new FormatUtils.Builder(false).build();
/**
* IP定位库工具类
*/
private static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "Kazakhstan.mmdb")
.loadAsnDataFileV4(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
.loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
.loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
.loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
.loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
.loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
.build();
// private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
// private static SnowflakeId snowflakeId = new SnowflakeId();
/**
* 解析日志,并补全
*
* @param message 原始日志
* @param message kafka Topic原始日志
* @return 补全后的日志
*/
public static String getJsonMessage(String message) {
SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class);
String serverIp = sessionRecordLog.getServer_ip();
String clientIp = sessionRecordLog.getClient_ip();
public static String dealCommonMessage(String message) {
Object object = JSONObject.parseObject(message, mapObject.getClass());
try {
sessionRecordLog.setUid(SnowflakeId.generateId());
sessionRecordLog.setServer_location(ipLookup.countryLookup(serverIp));
sessionRecordLog.setClient_location(ipLookup.cityLookupDetail(clientIp));
sessionRecordLog.setClient_asn(ipLookup.asnLookup(clientIp, true));
sessionRecordLog.setServer_asn(ipLookup.asnLookup(serverIp, true));
sessionRecordLog.setDomain(getTopDomain(sessionRecordLog.getSni(), sessionRecordLog.getHost()));
sessionRecordLog.setRecv_time(System.currentTimeMillis() / 1000);
// sessionRecordLog.setSubscribe_id(getSubscribeId(clientIp));
return JSONObject.toJSONString(sessionRecordLog);
} catch (Exception e) {
logger.error("日志解析过程出现异常", e);
return "";
}
}
for (String[] strings : jobList) {
//用到的参数的值
Object name = JsonParseUtil.getValue(object, strings[0]);
//需要补全的字段的key
String appendToKeyName = strings[1];
//需要补全的字段的值
Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
//匹配操作函数的字段
String function = strings[2];
//额外的参数的值
String param = strings[3];
/**
* 有sni通过sni获取域名有hots根据host获取域名
*
* @param sni sni
* @param host host
* @return 顶级域名
*/
private static String getTopDomain(String sni, String host) {
if (StringUtil.isNotBlank(sni)) {
return getDomain(sni);
} else if (StringUtil.isNotBlank(host)) {
return getDomain(host);
} else {
return "";
}
}
/**
* 获取用户名
*
* @param key Sip
* @return SubscribeId
*/
private static String getSubscribeId(String key) {
String sub = "";
try (Jedis jedis = RedisPollUtils.getJedis()) {
if (jedis != null) {
sub = jedis.get(key);
}
} catch (Exception e) {
logger.error("通过Redis获取用户名出现异常", e);
}
return sub;
}
/**
* 根据url截取顶级域名
*
* @param url 网站url
* @return 顶级域名
*/
private static String getDomain(String url) {
try {
Matcher matcher = WEB_PATTERN.matcher(url);
if (matcher.find()) {
return matcher.group();
functionSet(function, object, appendToKeyName, appendTo, name, param);
}
return JSONObject.toJSONString(object);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志预处理过程出现异常");
e.printStackTrace();
return "";
}
return "";
}
public static void main(String[] args) {
String s = ipLookup.countryLookup("192.168.10.207");
System.out.println(s);
}
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 匹配操作函数的字段
* @param object 动态POJO Object
* @param appendToKeyName 需要补全的字段的key
* @param appendTo 需要补全的字段的值
* @param name 用到的参数的值
* @param param 额外的参数的值
*/
private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
switch (function) {
case "current_timestamp":
if ((long) appendTo == 0L) {
JsonParseUtil.setValue(object, appendToKeyName, getCurrentTime());
}
break;
case "snowflake_id":
// JsonParseUtil.setValue(object, appendToKeyName,
// build.getSnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM));
JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
break;
case "geo_ip_detail":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpDetail(ipLookup, name.toString()));
}
break;
case "geo_asn":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoAsn(ipLookup, name.toString()));
}
break;
case "geo_ip_country":
if (name != null && appendTo == null) {
JsonParseUtil.setValue(object, appendToKeyName, getGeoIpCountry(ipLookup, name.toString()));
}
break;
case "get_value":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, name);
}
break;
case "if":
if (param != null) {
JsonParseUtil.setValue(object, appendToKeyName, condition(object, param));
}
break;
case "sub_domain":
if (appendTo == null && name != null) {
JsonParseUtil.setValue(object, appendToKeyName, getTopDomain(name.toString()));
}
break;
case "radius_match":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, radiusMatch(name.toString()));
}
break;
case "decode_of_base64":
if (name != null) {
JsonParseUtil.setValue(object, appendToKeyName, decodeBase64(name.toString(), isJsonValue(object, param)));
}
break;
case "flattenSpec":
if (name != null && param != null) {
JsonParseUtil.setValue(object, appendToKeyName, flattenSpec(name.toString(), isJsonValue(object, param)));
}
break;
default:
}
}
}

View File

@@ -0,0 +1,184 @@
package cn.ac.iie.utils.general;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.json.JsonParseUtil;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.zdjizhi.utils.Encodes;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
/**
* @author qidaijie
*/
class TransFunction {
private static Logger logger = Logger.getLogger(TransFunction.class);
/**
* 生成当前时间戳的操作
*/
static long getCurrentTime() {
return System.currentTimeMillis() / 1000;
}
/**
* 根据clientIp获取location信息
*
* @param ip client IP
* @return ip地址详细信息
*/
static String getGeoIpDetail(IpLookup ipLookup, String ip) {
return ipLookup.cityLookupDetail(ip);
}
/**
* 根据ip获取asn信息
*
* @param ip client/server IP
* @return ASN
*/
static String getGeoAsn(IpLookup ipLookup, String ip) {
return ipLookup.asnLookup(ip);
}
/**
* 根据ip获取country信息
*
* @param ip server IP
* @return 国家
*/
static String getGeoIpCountry(IpLookup ipLookup, String ip) {
return ipLookup.countryLookup(ip);
}
/**
* radius借助HBase补齐
*
* @param ip client IP
* @return account
*/
static String radiusMatch(String ip) {
String account = HBaseUtils.getAccount(ip.trim());
if (StringUtil.isBlank(account)) {
logger.warn("HashMap get Account is null,IP is :" + ip);
}
return account;
}
/**
* 解析顶级域名
*
* @param domain 初始域名
* @return 顶级域名
*/
static String getTopDomain(String domain) {
try {
return FormatUtils.getTopPrivateDomain(domain);
} catch (StringIndexOutOfBoundsException outException) {
logger.error("解析顶级域名异常,异常域名:" + domain, outException);
return "";
}
}
/**
* 根据编码解码base64
*
* @param message base64
* @param charset 编码
* @return 解码字符串
*/
static String decodeBase64(String message, String charset) {
String result = "";
try {
if (StringUtil.isNotBlank(message)) {
if (StringUtil.isNotBlank(charset)) {
result = Encodes.decodeBase64String(message, charset);
} else {
result = Encodes.decodeBase64String(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
}
}
} catch (Exception e) {
logger.error("解析 Base64 异常,异常信息:" + e);
}
return result;
}
/**
* 根据表达式解析json
*
* @param message json
* @param expr 解析表达式
* @return 解析结果
*/
static String flattenSpec(String message, String expr) {
String flattenResult = "";
try {
if (StringUtil.isNotBlank(expr)) {
ArrayList<String> read = JsonPath.parse(message).read(expr);
flattenResult = read.get(0);
}
} catch (ClassCastException | InvalidPathException e) {
logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误", e);
}
return flattenResult;
}
/**
* 判断是否为日志字段,是则返回对应value否则返回原始字符串
*
* @param object 内存实体类
* @param param 字段名/普通字符串
* @return JSON.Value or String
*/
static String isJsonValue(Object object, String param) {
if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
Object value = JsonParseUtil.getValue(object, param.substring(2));
if (value != null) {
return value.toString();
} else {
return "";
}
} else {
return param;
}
}
/**
* IF函数实现解析日志构建三目运算
*
* @param object 内存实体类
* @param ifParam 字段名/普通字符串
* @return resultA or resultB or ""
*/
static String condition(Object object, String ifParam) {
String result = "";
try {
String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
String direction = isJsonValue(object, norms[0]);
if (StringUtil.isNotBlank(direction)) {
if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
String resultA = isJsonValue(object, split[1]);
String resultB = isJsonValue(object, split[2]);
result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB;
}
}
} catch (Exception e) {
logger.error("IF 函数执行异常,异常信息:" + e);
e.printStackTrace();
}
return result;
}
}

View File

@@ -0,0 +1,210 @@
package cn.ac.iie.utils.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
/**
* HBase 工具类
*
* @author qidaijie
*/
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String ,String > subIdMap = new ConcurrentHashMap<>(83334);
// private static Map<String, String> subId/Map = new HashMap<>(83334);
private static Connection connection;
private static Long time;
private static String zookeeperIp;
private static String hBaseTable;
private static HBaseUtils hBaseUtils;
private static void getHBaseInstance() {
hBaseUtils = new HBaseUtils();
}
/**
* 构造函数-新
*/
private HBaseUtils() {
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
//获取连接
getHBaseConn();
//拉取所有
getAll();
//定时更新
updateHabaseCache();
}
private static void getHBaseConn() {
try {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", zookeeperIp);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
connection = ConnectionFactory.createConnection(configuration);
time = System.currentTimeMillis();
logger.warn("HBaseUtils get HBase connection,now to getAll().");
// getAll();
} catch (IOException ioe) {
logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
e.printStackTrace();
}
}
/**
* 更新变量
*/
public static void change() {
if (hBaseUtils == null) {
getHBaseInstance();
}
long nowTime = System.currentTimeMillis();
timestampsFilter(time - 1000, nowTime + 500);
}
public static void main(String[] args) {
change();
}
/**
* 获取变更内容
*
* @param startTime 开始时间
* @param endTime 结束时间
*/
private static void timestampsFilter(Long startTime, Long endTime) {
Long begin = System.currentTimeMillis();
Table table = null;
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String key = Bytes.toString(CellUtil.cloneRow(cell)).trim();
String value = Bytes.toString(CellUtil.cloneValue(cell)).trim();
if (subIdMap.containsKey(key)) {
if (!value.equals(subIdMap.get(key))) {
subIdMap.put(key, value);
}
} else {
subIdMap.put(key, value);
}
}
}
Long end = System.currentTimeMillis();
logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
time = endTime;
} catch (IOException ioe) {
logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
e.printStackTrace();
} finally {
if (scanner != null) {
scanner.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 获取所有的 key value
*/
private static void getAll() {
long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException ioe) {
logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
ioe.printStackTrace();
} catch (Exception e) {
logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
e.printStackTrace();
}
}
/**
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
*/
private void updateHabaseCache() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
change();
} catch (Exception e) {
logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
e.printStackTrace();
}
}
}, 1, 1000 * FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);//起始1ms,以后每隔60s
}
/**
* 获取 account
*
* @param clientIp client_ip
* @return account
*/
public static String getAccount(String clientIp) {
if (hBaseUtils == null) {
getHBaseInstance();
}
return subIdMap.get(clientIp);
}
}

View File

@@ -0,0 +1,55 @@
package cn.ac.iie.utils.http;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
/**
* 请求网关获取schema
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
try {
HttpGet get = new HttpGet(http);
try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return entityStringBuilder.toString();
}
}

View File

@@ -45,6 +45,22 @@ public class InfluxDbUtils {
}
}
/**
* 记录对准失败次数-即内存中没有对应的key
*
* @param failure 对准失败量
*/
public static void sendHBaseFailure(int failure) {
if (failure != 0) {
InfluxDB client = InfluxDBFactory.connect(FlowWriteConfig.INFLUX_IP, FlowWriteConfig.INFLUX_USERNAME, FlowWriteConfig.INFLUX_PASSWORD);
Point point1 = Point.measurement("sendHBaseFailure")
.tag("topic", FlowWriteConfig.KAFKA_TOPIC)
.field("failure", failure)
.build();
client.write("BusinessMonitor", "", point1);
}
}
/**
* 获取本机IP
*

View File

@@ -0,0 +1,240 @@
package cn.ac.iie.utils.json;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.utils.StringUtil;
import net.sf.cglib.beans.BeanGenerator;
import net.sf.cglib.beans.BeanMap;
import org.apache.log4j.Logger;
import scala.annotation.meta.field;
import java.util.*;
/**
* 使用FastJson解析json的工具类
*
* @author qidaijie
*/
public class JsonParseUtil {
private static Logger logger = Logger.getLogger(JsonParseUtil.class);
/**
* 模式匹配,给定一个类型字符串返回一个类类型
*
* @param type 类型
* @return 类类型
*/
private static Class getClassName(String type) {
Class clazz;
switch (type) {
case "int":
clazz = Integer.class;
break;
case "String":
clazz = String.class;
break;
case "long":
clazz = long.class;
break;
case "Integer":
clazz = Integer.class;
break;
case "double":
clazz = double.class;
break;
case "float":
clazz = float.class;
break;
case "char":
clazz = char.class;
break;
case "byte":
clazz = byte.class;
break;
case "boolean":
clazz = boolean.class;
break;
case "short":
clazz = short.class;
break;
default:
clazz = String.class;
}
return clazz;
}
/**
* 根据反射生成对象的方法
*
* @param properties 反射类用的map
* @return 生成的Object类型的对象
*/
public static Object generateObject(Map properties) {
BeanGenerator generator = new BeanGenerator();
Set keySet = properties.keySet();
for (Object aKeySet : keySet) {
String key = (String) aKeySet;
generator.addProperty(key, (Class) properties.get(key));
}
return generator.create();
}
/**
* 获取属性值的方法
*
* @param obj 对象
* @param property key
* @return 属性的值
*/
public static Object getValue(Object obj, String property) {
BeanMap beanMap = BeanMap.create(obj);
return beanMap.get(property);
}
/**
* 更新属性值的方法
*
* @param obj 对象
* @param property 更新的key
* @param value 更新的值
*/
public static void setValue(Object obj, String property, Object value) {
try {
BeanMap beanMap = BeanMap.create(obj);
beanMap.put(property, value);
} catch (ClassCastException e) {
logger.error("赋予实体类错误类型数据", e);
}
}
/**
* 通过获取String类型的网关schema链接来获取map用于生成一个Object类型的对象
*
* @param http 网关schema地址
* @return 用于反射生成schema类型的对象的一个map集合
*/
public static HashMap<String, Class> getMapFromHttp(String http) {
HashMap<String, Class> map = new HashMap<>();
String schema = HttpClientUtil.requestByGetMethod(http);
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
String filedStr = field.toString();
if (checkKeepField(filedStr)) {
String name = JsonPath.read(filedStr, "$.name").toString();
String type = JsonPath.read(filedStr, "$.type").toString();
// boolean contains = type.contains("[");
// if (contains) {
// map.put(name, Integer.class);
// } else {
//组合用来生成实体类的map
map.put(name, getClassName(type));
// }
}
}
return map;
}
/**
* 判断字段是否需要保留
*
* @param message 单个field-json
* @return true or false
*/
private static boolean checkKeepField(String message) {
boolean isKeepField = true;
boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
if (isHiveDoc) {
boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
if (isHiveVi) {
String visibility = JsonPath.read(message, "$.doc.visibility").toString();
if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
isKeepField = false;
}
}
}
return isKeepField;
}
/**
* 根据http链接获取schema解析之后返回一个任务列表 (useList toList funcList paramlist)
*
* @param http 网关url
* @return 任务列表
*/
public static ArrayList<String[]> getJobListFromHttp(String http) {
ArrayList<String[]> list = new ArrayList<>();
String schema = HttpClientUtil.requestByGetMethod(http);
//解析data
Object data = JSON.parseObject(schema).get("data");
//获取fields并转化为数组数组的每个元素都是一个name doc type
JSONObject schemaJson = JSON.parseObject(data.toString());
JSONArray fields = (JSONArray) schemaJson.get("fields");
for (Object field : fields) {
if (JSON.parseObject(field.toString()).containsKey("doc")) {
Object doc = JSON.parseObject(field.toString()).get("doc");
if (JSON.parseObject(doc.toString()).containsKey("format")) {
String name = JSON.parseObject(field.toString()).get("name").toString();
Object format = JSON.parseObject(doc.toString()).get("format");
JSONObject formatObject = JSON.parseObject(format.toString());
String functions = formatObject.get("functions").toString();
String appendTo = null;
String params = null;
if (formatObject.containsKey("appendTo")) {
appendTo = formatObject.get("appendTo").toString();
}
if (formatObject.containsKey("param")) {
params = formatObject.get("param").toString();
}
if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], null});
}
} else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
for (int i = 0; i < functionArray.length; i++) {
list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
}
} else {
list.add(new String[]{name, name, functions, params});
}
}
}
}
return list;
}
}

View File

@@ -1,11 +1,10 @@
package cn.ac.iie.utils.kafka;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.influxdb.InfluxDbUtils;
import cn.ac.iie.common.KafkaProConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@@ -16,8 +15,8 @@ import java.util.Properties;
* @create 2018-08-13 15:11
*/
public class KafkaLogNtc {
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
public class KafkaLogSend {
private static Logger logger = Logger.getLogger(KafkaLogSend.class);
/**
* kafka生产者用于向kafka中发送消息
@@ -27,17 +26,17 @@ public class KafkaLogNtc {
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static KafkaLogNtc kafkaLogNtc;
private static KafkaLogSend kafkaLogSend;
private KafkaLogNtc() {
private KafkaLogSend() {
initKafkaProducer();
}
public static KafkaLogNtc getInstance() {
if (kafkaLogNtc == null) {
kafkaLogNtc = new KafkaLogNtc();
public static KafkaLogSend getInstance() {
if (kafkaLogSend == null) {
kafkaLogSend = new KafkaLogSend();
}
return kafkaLogNtc;
return kafkaLogSend;
}
@@ -59,7 +58,7 @@ public class KafkaLogNtc {
}
}
kafkaProducer.flush();
logger.warn("Log sent to National Center successfully!!!!!");
logger.debug("Log sent to National Center successfully!!!!!");
}
/**
@@ -67,15 +66,25 @@ public class KafkaLogNtc {
*/
private void initKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("linger.ms", "2");
properties.put("request.timeout.ms", 60000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
properties.put("compression.type", "snappy");
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
properties.put("retries", KafkaProConfig.RETRIES);
properties.put("linger.ms", KafkaProConfig.LINGER_MS);
properties.put("request.timeout.ms", KafkaProConfig.REQUEST_TIMEOUT_MS);
properties.put("batch.size", KafkaProConfig.BATCH_SIZE);
properties.put("buffer.memory", KafkaProConfig.BUFFER_MEMORY);
properties.put("max.request.size", KafkaProConfig.MAX_REQUEST_SIZE);
// properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
/**
* kafka限流配置-20201117
*/
properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
kafkaProducer = new KafkaProducer<>(properties);
}

View File

@@ -1,79 +0,0 @@
package cn.ac.iie.utils.redis;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.log4j.Logger;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
/**
* 预用于对准IP对应的用户名的 Redis连接池
*
* @author my
* @date 2018-07-04
*/
public final class RedisClusterUtils {
private static final Logger logger = Logger.getLogger(RedisClusterUtils.class);
private static JedisCluster jedisCluster;
private static Properties props = new Properties();
static {
try {
String redisConfigFile = "redis_config.properties";
props.load(RedisClusterUtils.class.getClassLoader().getResourceAsStream(redisConfigFile));
} catch (IOException e) {
props = null;
logger.error("加载Redis配置文件失败", e);
}
}
/**
* 不允许通过new创建该类的实例
*/
private RedisClusterUtils() {
}
/**
* 初始化Redis连接池
*/
private static JedisCluster getJedisCluster() {
if (jedisCluster == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE)));
poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE)));
poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT)));
poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN)));
poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW)));
Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>();
for (String port : props.getProperty(FlowWriteConfig.REDIS_PORT).split(FlowWriteConfig.SEGMENTATION)) {
for (String ip : props.getProperty(FlowWriteConfig.REDIS_IP).split(FlowWriteConfig.SEGMENTATION)) {
nodes.add(new HostAndPort(ip, Integer.parseInt(port)));
}
}
jedisCluster = new JedisCluster(nodes, poolConfig);
}
return jedisCluster;
}
/**
* 获取用户名
*
* @param key service_ip
* @return Subscribe_id
*/
public static String get(String key) {
String s = key.split("\\.")[0];
if (!FlowWriteConfig.CHECK_IP_SCOPE.contains(s)) {
jedisCluster = getJedisCluster();
return jedisCluster.get(key);
}
return "";
}
}

View File

@@ -1,115 +0,0 @@
package cn.ac.iie.utils.redis;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.Properties;
/**
* @author qidaijie
*/
public class RedisPollUtils {
private static final Logger logger = Logger.getLogger(RedisPollUtils.class);
private static JedisPool jedisPool = null;
private static Properties props = new Properties();
private RedisPollUtils() {
}
static {
initialPool();
}
/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
//加载连接池配置文件
props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream("redis_config.properties"));
// 创建jedis池配置实例
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXACTIVE)));
poolConfig.setMaxIdle(Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXIDLE)));
poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_MAXWAIT)));
poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONRETURN)));
poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty(FlowWriteConfig.REDIS_POOL_TESTONBORROW)));
// 根据配置实例化jedis池
jedisPool = new JedisPool(poolConfig, props.getProperty(FlowWriteConfig.REDIS_IP),
Integer.valueOf(props.getProperty(FlowWriteConfig.REDIS_PORT)));
} catch (Exception e) {
logger.error("Redis连接池初始化错误", e);
}
}
/**
* 获取Jedis实例
*
* @return Jedis实例
*/
public static Jedis getJedis() {
Jedis jedis = null;
try {
if (jedisPool == null) {
initialPool();
}
jedis = jedisPool.getResource();
} catch (Exception e) {
logger.error("Redis连接池错误,无法获取连接", e);
}
return jedis;
}
// /**
// * @param key redis key
// * @return value
// */
// public static Integer getWorkerId(String key) {
// int workId = 0;
// int maxId = 32;
// try (Jedis jedis = RedisPollUtils.getJedis()) {
// if (jedis != null) {
// String work = jedis.get(key);
// if (StringUtil.isBlank(work)) {
// jedis.set(key, "0");
// } else {
// workId = Integer.parseInt(work);
// }
// if (workId < maxId) {
// jedis.set(key, String.valueOf(workId + 1));
// } else {
// workId = 0;
// jedis.set(key, "1");
// }
// }
// } catch (Exception e) {
// logger.error("通过Redis获取用户名出现异常", e);
// workId = RandomUtils.nextInt(0, 31);
// }
// return workId;
// }
public static Integer getWorkerId(String key) {
int workId = 0;
try (Jedis jedis = RedisPollUtils.getJedis()) {
if (jedis != null) {
workId = Integer.parseInt(jedis.get(key));
jedis.set(key, String.valueOf(workId + 2));
logger.error("\n工作id是" + workId + "\n");
}
} catch (Exception e) {
logger.error("通过Redis获取用户名出现异常", e);
workId = RandomUtils.nextInt(0, 31);
}
return workId;
}
}

View File

@@ -9,15 +9,15 @@ import java.util.Properties;
public final class FlowWriteConfigurations {
// private static Properties propCommon = new Properties();
private static Properties propKafka = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
// } else if (type == 1) {
// return propCommon.getProperty(key);
} else if (type == 1) {
return propKafka.getProperty(key);
} else {
return null;
}
@@ -27,8 +27,8 @@ public final class FlowWriteConfigurations {
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
// } else if (type == 1) {
// return Integer.parseInt(propCommon.getProperty(key));
} else if (type == 1) {
return Integer.parseInt(propKafka.getProperty(key));
} else {
return null;
}
@@ -37,8 +37,8 @@ public final class FlowWriteConfigurations {
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
// } else if (type == 1) {
// return Long.parseLong(propCommon.getProperty(key));
} else if (type == 1) {
return Long.parseLong(propKafka.getProperty(key));
} else {
return null;
}
@@ -47,8 +47,8 @@ public final class FlowWriteConfigurations {
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return "true".equals(propService.getProperty(key).toLowerCase().trim());
// } else if (type == 1) {
// return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
} else if (type == 1) {
return "true".equals(propKafka.getProperty(key).toLowerCase().trim());
} else {
return null;
}
@@ -57,8 +57,9 @@ public final class FlowWriteConfigurations {
static {
try {
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("kafka_config.properties"));
} catch (Exception e) {
// propCommon = null;
propKafka = null;
propService = null;
}
}

View File

@@ -1,7 +1,5 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.SnowflakeId;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -188,32 +186,4 @@ public class DistributedLock implements Lock, Watcher {
}
}
public static void main(String[] args) {
ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
Runnable runnable = new Runnable() {
@Override
public void run() {
DistributedLock lock = null;
try {
lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
lock.lock();
// System.out.println(SnowflakeId.generateId());
System.out.println(1);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}

View File

@@ -1,7 +1,5 @@
package cn.ac.iie.utils.zookeeper;
import cn.ac.iie.common.FlowWriteConfig;
import org.apache.commons.lang3.RandomUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
@@ -13,9 +11,12 @@ import java.util.concurrent.CountDownLatch;
/**
* @author qidaijie
* @Package cn.ac.iie.utils.zookeeper
* @Description:
* @date 2020/11/1411:28
*/
public class ZookeeperUtils implements Watcher {
private static Logger logger = Logger.getLogger(ZookeeperUtils.class);
private static Logger logger = Logger.getLogger(com.zdjizhi.utils.ZookeeperUtils.class);
private ZooKeeper zookeeper;
@@ -25,7 +26,7 @@ public class ZookeeperUtils implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
@@ -36,15 +37,14 @@ public class ZookeeperUtils implements Watcher {
*
* @param path 节点路径
*/
public int modifyNode(String path) {
createNode("/Snowflake", null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
createNode("/Snowflake/" + FlowWriteConfig.KAFKA_TOPIC, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
int workerId;
public int modifyNode(String path, String zookeeperIp) {
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
int workerId = 0;
try {
connectZookeeper();
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
if (workerId > 55) {
if (workerId > 63) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
@@ -56,22 +56,22 @@ public class ZookeeperUtils implements Watcher {
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
workerId = RandomUtils.nextInt(56, 63);
logger.error("modify error Can't modify," + e.getMessage());
} finally {
closeConn();
}
logger.error("工作ID是" + workerId);
logger.warn("workerID is" + workerId);
return workerId;
}
/**
* 连接zookeeper
*
* @param host 地址
*/
private void connectZookeeper() {
public void connectZookeeper(String host) {
try {
zookeeper = new ZooKeeper(FlowWriteConfig.ZOOKEEPER_SERVERS, SESSION_TIME_OUT, this);
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
@@ -81,7 +81,7 @@ public class ZookeeperUtils implements Watcher {
/**
* 关闭连接
*/
private void closeConn() {
public void closeConn() {
try {
if (zookeeper != null) {
zookeeper.close();
@@ -97,7 +97,7 @@ public class ZookeeperUtils implements Watcher {
* @param path 节点路径
* @return 内容/异常null
*/
private String getNodeDate(String path) {
public String getNodeDate(String path) {
String result = null;
Stat stat = new Stat();
try {
@@ -115,14 +115,18 @@ public class ZookeeperUtils implements Watcher {
* @param date 节点所存储的数据的byte[]
* @param acls 控制权限策略
*/
private void createNode(String path, byte[] date, List<ACL> acls) {
public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
try {
connectZookeeper();
connectZookeeper(zookeeperIp);
Stat exists = zookeeper.exists(path, true);
if (exists == null) {
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
if (existsSnowflakeld == null) {
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
}
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
} else {
logger.warn("Node already exists!,Don't need to create");
logger.warn("Node already exists ! Don't need to create");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
@@ -130,5 +134,4 @@ public class ZookeeperUtils implements Watcher {
closeConn();
}
}
}

View File

@@ -1,23 +0,0 @@
#Log4j
log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=error
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console