From d83f648292f901f10349dcb6410d8ee9de926a5e Mon Sep 17 00:00:00 2001 From: qidaijie Date: Tue, 12 Nov 2019 16:02:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=85=8D=E7=BD=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- properties/service_flow_config.properties | 16 ++++--- .../ac/iie/bean/CollectProtocolRecordLog.java | 15 ++++--- .../cn/ac/iie/bean/ProxySessionRecordLog.java | 8 +++- .../ac/iie/bean/PublicSessionRecordLog.java | 8 +++- .../ac/iie/bean/RadiusSessionRecordLog.java | 11 +++-- .../cn/ac/iie/bean/SecurityPolicyLog.java | 15 ++++--- .../java/cn/ac/iie/bolt/NtcLogSendBolt.java | 16 +++---- .../collectProtocol/CollectCompletedBolt.java | 13 +++--- .../iie/bolt/proxy/ProxyCompletionBolt.java | 11 ++--- .../iie/bolt/radius/RadiusCompletionBolt.java | 7 ++- .../bolt/security/SecurityCompletionBolt.java | 12 ++--- .../cn/ac/iie/common/FlowWriteConfig.java | 1 + .../ac/iie/topology/LogFlowWriteTopology.java | 24 +++------- .../ac/iie/utils/general/TransFormUtils.java | 44 ++++++++++--------- 14 files changed, 112 insertions(+), 89 deletions(-) diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e07af38..fb65910 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -17,7 +17,7 @@ auto.offset.reset=latest kafka.topic=SECURITY-POLICY-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=session-completion-program +group.id=security-policy-191112 #输出topic results.output.topic=SECURITY-POLICY-COMPLETED-LOG @@ -35,16 +35,20 @@ datacenter.bolt.parallelism=1 kafka.bolt.parallelism=1 #定位库地址 -#ip.library=/home/ceiec/topology/dat/ -ip.library=D:/dat/ +ip.library=/home/ceiec/topology/dat/ +#ip.library=D:/dat/ + #kafka批量条数 -batch.insert.num=5000 +batch.insert.num=2000 #数据中心(UID) -data.center.id.num=10 +data.center.id.num=15 #tick时钟频率 -topology.tick.tuple.freq.secs=60 +topology.tick.tuple.freq.secs=5 + +#hbase 更新时间 +hbase.tick.tuple.freq.secs=60 #当bolt性能受限时,限制spout接收速度,理论看ack开启才有效 topology.config.max.spout.pending=150000 diff --git a/src/main/java/cn/ac/iie/bean/CollectProtocolRecordLog.java b/src/main/java/cn/ac/iie/bean/CollectProtocolRecordLog.java index 6131846..43b7d00 100644 --- a/src/main/java/cn/ac/iie/bean/CollectProtocolRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/CollectProtocolRecordLog.java @@ -1,6 +1,11 @@ package cn.ac.iie.bean; -public class CollectProtocolRecordLog extends PublicSessionRecordLog{ +/** + * 除radius之外 + * + * @author qidaijie + */ +public class CollectProtocolRecordLog extends PublicSessionRecordLog { //TODO HTTP协议属性 21 @@ -77,23 +82,23 @@ public class CollectProtocolRecordLog extends PublicSessionRecordLog{ private String ssl_client_side_version; private String ssl_error; -//TODO FTP协议属性 2 + //TODO FTP协议属性 2 private String ftp_url; private String ftp_content; -//TODO BGP协议属性 3 + //TODO BGP协议属性 3 private int bgp_type; private String bgp_as_num; private String bgp_route; -//TODO VOIP协议属性 4 + //TODO VOIP协议属性 4 private String voip_calling_account; private String voip_called_account; private String voip_calling_number; private String voip_called_number; -//TODO STREAMING_MEDIA协议属性 2 + //TODO STREAMING_MEDIA协议属性 2 private String streaming_media_url; private String streaming_media_protocol; diff --git a/src/main/java/cn/ac/iie/bean/ProxySessionRecordLog.java b/src/main/java/cn/ac/iie/bean/ProxySessionRecordLog.java index 53b8bd7..441205d 100644 --- a/src/main/java/cn/ac/iie/bean/ProxySessionRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/ProxySessionRecordLog.java @@ -1,6 +1,10 @@ package cn.ac.iie.bean; -import com.alibaba.fastjson.annotation.JSONField; -import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter; + + + +/** + * @author qidaijie + */ public class ProxySessionRecordLog extends PublicSessionRecordLog{ diff --git a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java index 7d853d4..d9e6ced 100644 --- a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java @@ -1,11 +1,15 @@ package cn.ac.iie.bean; -import com.alibaba.fastjson.annotation.JSONField; -import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter; +/** + * 公共类 + * + * @author qidaijie + */ public class PublicSessionRecordLog { //TODO 基础属性 40 int22 + private int common_service; private int common_direction; private long common_recv_time; diff --git a/src/main/java/cn/ac/iie/bean/RadiusSessionRecordLog.java b/src/main/java/cn/ac/iie/bean/RadiusSessionRecordLog.java index d467afc..a530da6 100644 --- a/src/main/java/cn/ac/iie/bean/RadiusSessionRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/RadiusSessionRecordLog.java @@ -1,7 +1,12 @@ package cn.ac.iie.bean; -import com.alibaba.fastjson.annotation.JSONField; -import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter; -public class RadiusSessionRecordLog extends PublicSessionRecordLog{ + + +/** + * Radius 日志 + * + * @author qidaijie + */ +public class RadiusSessionRecordLog extends PublicSessionRecordLog { //TODO RADIUS协议属性 4 diff --git a/src/main/java/cn/ac/iie/bean/SecurityPolicyLog.java b/src/main/java/cn/ac/iie/bean/SecurityPolicyLog.java index 775f1c7..998182f 100644 --- a/src/main/java/cn/ac/iie/bean/SecurityPolicyLog.java +++ b/src/main/java/cn/ac/iie/bean/SecurityPolicyLog.java @@ -1,6 +1,11 @@ package cn.ac.iie.bean; -public class SecurityPolicyLog extends PublicSessionRecordLog{ +/** + * 策略 + * + * @author qidaijie + */ +public class SecurityPolicyLog extends PublicSessionRecordLog { //TODO HTTP协议属性 21 @@ -77,23 +82,23 @@ public class SecurityPolicyLog extends PublicSessionRecordLog{ private String ssl_client_side_version; private String ssl_error; -//TODO FTP协议属性 2 + //TODO FTP协议属性 2 private String ftp_url; private String ftp_content; -//TODO BGP协议属性 3 + //TODO BGP协议属性 3 private int bgp_type; private String bgp_as_num; private String bgp_route; -//TODO VOIP协议属性 4 + //TODO VOIP协议属性 4 private String voip_calling_account; private String voip_called_account; private String voip_calling_number; private String voip_called_number; -//TODO STREAMING_MEDIA协议属性 2 + //TODO STREAMING_MEDIA协议属性 2 private String streaming_media_url; private String streaming_media_protocol; diff --git a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java index e260447..da4786b 100644 --- a/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java +++ b/src/main/java/cn/ac/iie/bolt/NtcLogSendBolt.java @@ -24,11 +24,11 @@ import java.util.Map; * @date 2018/8/14 */ public class NtcLogSendBolt extends BaseBasicBolt { - private static final long serialVersionUID = 3940515789830317517L; + private static final long serialVersionUID = -3663610927224396615L; private static Logger logger = Logger.getLogger(NtcLogSendBolt.class); private List list; private KafkaLogNtc kafkaLogNtc; - private static long successfulSum = 0; +// private static long successfulSum = 0; @Override @@ -43,11 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt { if (TupleUtils.isTick(tuple)) { if (list.size() != 0) { kafkaLogNtc.sendMessage(list); - successfulSum += list.size(); +// successfulSum += list.size(); list.clear(); } - basicOutputCollector.emit(new Values(successfulSum)); - successfulSum = 0L; +// basicOutputCollector.emit(new Values(successfulSum)); +// successfulSum = 0L; } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { @@ -55,12 +55,12 @@ public class NtcLogSendBolt extends BaseBasicBolt { } if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) { kafkaLogNtc.sendMessage(list); - successfulSum += list.size(); +// successfulSum += list.size(); list.clear(); } } } catch (Exception e) { - logger.error("日志发送Kafka过程出现异常"); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常"); e.printStackTrace(); } } @@ -74,7 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("suc")); +// outputFieldsDeclarer.declare(new Fields("suc")); } } diff --git a/src/main/java/cn/ac/iie/bolt/collectProtocol/CollectCompletedBolt.java b/src/main/java/cn/ac/iie/bolt/collectProtocol/CollectCompletedBolt.java index 1cd40fa..c619be1 100644 --- a/src/main/java/cn/ac/iie/bolt/collectProtocol/CollectCompletedBolt.java +++ b/src/main/java/cn/ac/iie/bolt/collectProtocol/CollectCompletedBolt.java @@ -1,4 +1,4 @@ -package cn.ac.iie.bolt.collectProtocol; +package cn.ac.iie.bolt.collectprotocol; import cn.ac.iie.common.FlowWriteConfig; import cn.ac.iie.utils.system.TupleUtils; @@ -23,10 +23,10 @@ import static cn.ac.iie.utils.hbase.HbaseUtils.change; * * @author qidaijie */ -@SuppressWarnings("all") + public class CollectCompletedBolt extends BaseBasicBolt { - private static final long serialVersionUID = -105915167013846589L; private final static Logger logger = Logger.getLogger(CollectCompletedBolt.class); + private static final long serialVersionUID = 4682827168247333522L; @Override public void prepare(Map stormConf, TopologyContext context) { @@ -38,14 +38,15 @@ public class CollectCompletedBolt extends BaseBasicBolt { try { if (TupleUtils.isTick(tuple)) { change(); - }else { + } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(getCollectProtocolMessage(message))); } } } catch (Exception e) { - logger.error("接收解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); } } @@ -53,7 +54,7 @@ public class CollectCompletedBolt extends BaseBasicBolt { public Map getComponentConfiguration() { Map conf = new HashMap(16); conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, - FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); return conf; } diff --git a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java index 123095a..838702f 100644 --- a/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/proxy/ProxyCompletionBolt.java @@ -23,10 +23,10 @@ import static cn.ac.iie.utils.hbase.HbaseUtils.change; * * @author qidaijie */ -@SuppressWarnings("all") + public class ProxyCompletionBolt extends BaseBasicBolt { - private static final long serialVersionUID = -1059151670838465894L; private final static Logger logger = Logger.getLogger(ProxyCompletionBolt.class); + private static final long serialVersionUID = 6097654428594885032L; @Override public void prepare(Map stormConf, TopologyContext context) { @@ -38,14 +38,15 @@ public class ProxyCompletionBolt extends BaseBasicBolt { try { if (TupleUtils.isTick(tuple)) { change(); - }else { + } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(getProxyMessage(message))); } } } catch (Exception e) { - logger.error("接收解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); } } @@ -53,7 +54,7 @@ public class ProxyCompletionBolt extends BaseBasicBolt { public Map getComponentConfiguration() { Map conf = new HashMap(16); conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, - FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); return conf; } diff --git a/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java index 9b6ebde..7771e74 100644 --- a/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java @@ -1,5 +1,6 @@ package cn.ac.iie.bolt.radius; +import cn.ac.iie.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; import org.apache.log4j.Logger; import org.apache.storm.task.TopologyContext; @@ -20,8 +21,9 @@ import static cn.ac.iie.utils.general.TransFormUtils.getRadiusMessage; * @author qidaijie */ public class RadiusCompletionBolt extends BaseBasicBolt { - private static final long serialVersionUID = -3657802387129063952L; + private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class); + private static final long serialVersionUID = -3657802387129063952L; @Override public void prepare(Map stormConf, TopologyContext context) { @@ -36,7 +38,8 @@ public class RadiusCompletionBolt extends BaseBasicBolt { basicOutputCollector.emit(new Values(getRadiusMessage(message))); } } catch (Exception e) { - logger.error("接收解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); } } diff --git a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java index ebb33b7..4947ccd 100644 --- a/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java +++ b/src/main/java/cn/ac/iie/bolt/security/SecurityCompletionBolt.java @@ -23,10 +23,11 @@ import static cn.ac.iie.utils.general.TransFormUtils.getSecurityMessage; * * @author qidaijie */ -@SuppressWarnings("all") + public class SecurityCompletionBolt extends BaseBasicBolt { - private static final long serialVersionUID = -1059151770138464874L; + private final static Logger logger = Logger.getLogger(SecurityCompletionBolt.class); + private static final long serialVersionUID = -2380858260054733989L; @Override public void prepare(Map stormConf, TopologyContext context) { @@ -38,14 +39,15 @@ public class SecurityCompletionBolt extends BaseBasicBolt { try { if (TupleUtils.isTick(tuple)) { HbaseUtils.change(); - }else { + } else { String message = tuple.getString(0); if (StringUtil.isNotBlank(message)) { basicOutputCollector.emit(new Values(getSecurityMessage(message))); } } } catch (Exception e) { - logger.error("接收解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常"); + e.printStackTrace(); } } @@ -53,7 +55,7 @@ public class SecurityCompletionBolt extends BaseBasicBolt { public Map getComponentConfiguration() { Map conf = new HashMap(16); conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, - FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS); return conf; } diff --git a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java index 97db894..3313144 100644 --- a/src/main/java/cn/ac/iie/common/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/common/FlowWriteConfig.java @@ -20,6 +20,7 @@ public class FlowWriteConfig { public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers"); public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism"); public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks"); public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index 58859ae..639a523 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -1,9 +1,8 @@ package cn.ac.iie.topology; -import cn.ac.iie.bean.CollectProtocolRecordLog; import cn.ac.iie.bolt.NtcLogSendBolt; -import cn.ac.iie.bolt.collectProtocol.CollectCompletedBolt; +import cn.ac.iie.bolt.collectprotocol.CollectCompletedBolt; import cn.ac.iie.bolt.radius.RadiusCompletionBolt; import cn.ac.iie.bolt.security.SecurityCompletionBolt; @@ -64,9 +63,7 @@ public class LogFlowWriteTopology { builder = new TopologyBuilder(); builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); - String topic_name = FlowWriteConfig.KAFKA_TOPIC; - - switch(topic_name){ + switch (FlowWriteConfig.KAFKA_TOPIC) { case "PROXY-POLICY-LOG": builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt"); @@ -87,22 +84,11 @@ public class LogFlowWriteTopology { builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt"); break; - - } - - /*if ("PROXY_POLICY_LOG".equals(FlowWriteConfig.KAFKA_TOPIC) || "COLLECT_HTTP_META_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) { - builder.setBolt("HttpCompletionBolt", new HttpCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("HttpCompletionBolt"); - } else if ("COLLECT_RADIUS_RECORD_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) { - builder.setBolt("RadiusCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt"); - } else { - builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt"); - }*/ -// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt"); + default: } + } + public static void main(String[] args) throws Exception { LogFlowWriteTopology csst = null; boolean runLocally = true; diff --git a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java index c515689..4801314 100644 --- a/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java +++ b/src/main/java/cn/ac/iie/utils/general/TransFormUtils.java @@ -19,7 +19,7 @@ import java.util.regex.Pattern; * @author qidaijie * @create 2018-08-13 15:11 */ -@SuppressWarnings("all") + public class TransFormUtils { private static Logger logger = Logger.getLogger(TransFormUtils.class); private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)"); @@ -34,6 +34,7 @@ public class TransFormUtils { /** * 解析日志,并补全 * 补subscriber_id,不补domain + * * @param message radius原始日志 * @return 补全后的日志 */ @@ -50,19 +51,17 @@ public class TransFormUtils { radiusSessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true)); return JSONObject.toJSONString(radiusSessionRecordLog); } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常"); + e.printStackTrace(); return ""; } } - - - - /** * 解析日志,并补全 * 补domain,补subscriber_id + * * @param message Proxy原始日志 * @return 补全后的日志 */ @@ -83,16 +82,17 @@ public class TransFormUtils { proxySessionRecordLog.setHttp_domain(getTopDomain(null, proxySessionRecordLog.getHttp_host())); return JSONObject.toJSONString(proxySessionRecordLog); } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常"); + e.printStackTrace(); return ""; } } - /** * 解析日志,并补全 * 补domain,补subscriber_id + * * @param message Security原始日志 * @return 补全后的日志 */ @@ -113,7 +113,8 @@ public class TransFormUtils { securitySessionRecordLog.setHttp_domain(getTopDomain(securitySessionRecordLog.getSsl_sni(), securitySessionRecordLog.getHttp_host())); return JSONObject.toJSONString(securitySessionRecordLog); } catch (Exception e) { - logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); + logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常"); + e.printStackTrace(); return ""; } } @@ -121,25 +122,26 @@ public class TransFormUtils { /** * 解析日志,并补全 * 补domain,补subscriber_id + * * @param message Security原始日志 * @return 补全后的日志 */ public static String getCollectProtocolMessage(String message) { - CollectProtocolRecordLog securitySessionRecordLog = JSONObject.parseObject(message, CollectProtocolRecordLog.class); - String serverIp = securitySessionRecordLog.getCommon_server_ip(); - String clientIp = securitySessionRecordLog.getCommon_client_ip(); + CollectProtocolRecordLog collectProtocolRecordLog = JSONObject.parseObject(message, CollectProtocolRecordLog.class); + String serverIp = collectProtocolRecordLog.getCommon_server_ip(); + String clientIp = collectProtocolRecordLog.getCommon_client_ip(); try { - securitySessionRecordLog.setCommon_log_id(SnowflakeId.generateId()); - securitySessionRecordLog.setCommon_recv_time((System.currentTimeMillis() / 1000)); - securitySessionRecordLog.setCommon_server_location(ipLookup.countryLookup(serverIp)); - securitySessionRecordLog.setCommon_client_location(ipLookup.cityLookupDetail(clientIp)); - securitySessionRecordLog.setCommon_client_asn(ipLookup.asnLookup(clientIp, true)); - securitySessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true)); + collectProtocolRecordLog.setCommon_log_id(SnowflakeId.generateId()); + collectProtocolRecordLog.setCommon_recv_time((System.currentTimeMillis() / 1000)); + collectProtocolRecordLog.setCommon_server_location(ipLookup.countryLookup(serverIp)); + collectProtocolRecordLog.setCommon_client_location(ipLookup.cityLookupDetail(clientIp)); + collectProtocolRecordLog.setCommon_client_asn(ipLookup.asnLookup(clientIp, true)); + collectProtocolRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true)); //TODO 集成AAA数据subscribe_id -数据端补全 - securitySessionRecordLog.setCommon_subscriber_id(HbaseUtils.getAccount(clientIp)); + collectProtocolRecordLog.setCommon_subscriber_id(HbaseUtils.getAccount(clientIp)); // securitySessionRecordLog.setCommon_subscriber_id("aaaaa"); - securitySessionRecordLog.setHttp_domain(getTopDomain(securitySessionRecordLog.getSsl_sni(), securitySessionRecordLog.getHttp_host())); - return JSONObject.toJSONString(securitySessionRecordLog); + collectProtocolRecordLog.setHttp_domain(getTopDomain(collectProtocolRecordLog.getSsl_sni(), collectProtocolRecordLog.getHttp_host())); + return JSONObject.toJSONString(collectProtocolRecordLog); } catch (Exception e) { logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e); return "";