diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 24a5401..66f5690 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,12 +1,13 @@ #管理kafka地址 -bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092 +bootstrap.servers=192.168.40.186:9092 #zookeeper 地址 zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 #hbase zookeeper地址 -hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 -#hbase.zookeeper.servers=192.168.40.224:2182 +#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181 +hbase.zookeeper.servers=192.168.40.224:2182 #hbase tablename hbase.table.name=subscriber_info @@ -15,13 +16,13 @@ hbase.table.name=subscriber_info auto.offset.reset=latest #kafka broker下的topic名称 -kafka.topic=SECURITY-EVENT-LOG +kafka.topic=CONNECTION-RECORD-LOG #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=security-policy-191114 #输出topic -results.output.topic=SECURITY-EVENT-COMPLETED-LOG +results.output.topic=CONNECTION-RECORD-COMPLETED-LOG #storm topology workers topology.workers=3 @@ -37,7 +38,7 @@ kafka.bolt.parallelism=12 #定位库地址 ip.library=/home/ceiec/topology/dat/ -#ip.library=D:/dat/ +#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\ #kafka批量条数 batch.insert.num=2000 diff --git a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java index 541e7b1..123ccff 100644 --- a/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/PublicSessionRecordLog.java @@ -63,13 +63,6 @@ public class PublicSessionRecordLog { this.common_service = common_service; } - public int getCommon_direction() { - return common_direction; - } - - public void setCommon_direction(int common_direction) { - this.common_direction = common_direction; - } public long getCommon_recv_time() { return common_recv_time; @@ -127,13 +120,6 @@ public class PublicSessionRecordLog { this.common_link_id = common_link_id; } - public int getCommon_encapsulation() { - return common_encapsulation; - } - - public void setCommon_encapsulation(int common_encapsulation) { - this.common_encapsulation = common_encapsulation; - } public int getCommon_server_port() { return common_server_port; @@ -223,6 +209,22 @@ public class PublicSessionRecordLog { this.common_stream_dir = common_stream_dir; } + public int getCommon_direction() { + return common_direction; + } + + public void setCommon_direction(int common_direction) { + this.common_direction = common_direction; + } + + public int getCommon_encapsulation() { + return common_encapsulation; + } + + public void setCommon_encapsulation(int common_encapsulation) { + this.common_encapsulation = common_encapsulation; + } + public int getCommon_has_dup_traffic() { return common_has_dup_traffic; } diff --git a/src/main/java/cn/ac/iie/bean/connection/ConnectionRecordLog.java b/src/main/java/cn/ac/iie/bean/connection/ConnectionRecordLog.java index e73c2d2..01d9ee4 100644 --- a/src/main/java/cn/ac/iie/bean/connection/ConnectionRecordLog.java +++ b/src/main/java/cn/ac/iie/bean/connection/ConnectionRecordLog.java @@ -49,8 +49,8 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { //TODO 3DNS协议属性 18 private int dns_message_id; - private int dns_qr; - private int dns_opcode; + private Integer dns_qr; + private Integer dns_opcode; private int dns_aa; private int dns_tc; private int dns_rd; @@ -70,11 +70,11 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { //TODO SSL协议属性 13 - private int ssl_pinningst; - private int ssl_intercept_state; + private Integer ssl_pinningst; + private Integer ssl_intercept_state; private int ssl_server_side_latency; private int ssl_client_side_latency; - private int ssl_cert_verify; + private Integer ssl_cert_verify; private int ssl_con_latency_ms; private String ssl_version; private String ssl_sni; @@ -161,6 +161,23 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { this.http_response_header = http_response_header; } + + public Integer getSsl_pinningst() { + return ssl_pinningst; + } + + public void setSsl_pinningst(Integer ssl_pinningst) { + this.ssl_pinningst = ssl_pinningst; + } + + public Integer getSsl_intercept_state() { + return ssl_intercept_state; + } + + public void setSsl_intercept_state(Integer ssl_intercept_state) { + this.ssl_intercept_state = ssl_intercept_state; + } + public String getHttp_request_body() { return http_request_body; } @@ -185,6 +202,14 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { this.http_request_body_key = http_request_body_key; } + public Integer getSsl_cert_verify() { + return ssl_cert_verify; + } + + public void setSsl_cert_verify(Integer ssl_cert_verify) { + this.ssl_cert_verify = ssl_cert_verify; + } + public String getHttp_response_body_key() { return http_response_body_key; } @@ -353,19 +378,19 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { this.dns_message_id = dns_message_id; } - public int getDns_qr() { + public Integer getDns_qr() { return dns_qr; } - public void setDns_qr(int dns_qr) { + public void setDns_qr(Integer dns_qr) { this.dns_qr = dns_qr; } - public int getDns_opcode() { + public Integer getDns_opcode() { return dns_opcode; } - public void setDns_opcode(int dns_opcode) { + public void setDns_opcode(Integer dns_opcode) { this.dns_opcode = dns_opcode; } @@ -489,21 +514,6 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { this.dns_rr = dns_rr; } - public int getSsl_pinningst() { - return ssl_pinningst; - } - - public void setSsl_pinningst(int ssl_pinningst) { - this.ssl_pinningst = ssl_pinningst; - } - - public int getSsl_intercept_state() { - return ssl_intercept_state; - } - - public void setSsl_intercept_state(int ssl_intercept_state) { - this.ssl_intercept_state = ssl_intercept_state; - } public int getSsl_server_side_latency() { return ssl_server_side_latency; @@ -521,14 +531,6 @@ public class ConnectionRecordLog extends PublicSessionRecordLog { this.ssl_client_side_latency = ssl_client_side_latency; } - public int getSsl_cert_verify() { - return ssl_cert_verify; - } - - public void setSsl_cert_verify(int ssl_cert_verify) { - this.ssl_cert_verify = ssl_cert_verify; - } - public int getSsl_con_latency_ms() { return ssl_con_latency_ms; } diff --git a/src/main/java/cn/ac/iie/bean/security/SecurityPolicyLog.java b/src/main/java/cn/ac/iie/bean/security/SecurityPolicyLog.java index e266b5e..d6a4e54 100644 --- a/src/main/java/cn/ac/iie/bean/security/SecurityPolicyLog.java +++ b/src/main/java/cn/ac/iie/bean/security/SecurityPolicyLog.java @@ -49,8 +49,8 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { //TODO 3DNS协议属性 18 private int dns_message_id; - private int dns_qr; - private int dns_opcode; + private Integer dns_qr; + private Integer dns_opcode; private int dns_aa; private int dns_tc; private int dns_rd; @@ -70,11 +70,11 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { //TODO SSL协议属性 13 - private int ssl_pinningst; - private int ssl_intercept_state; + private Integer ssl_pinningst; + private Integer ssl_intercept_state; private int ssl_server_side_latency; private int ssl_client_side_latency; - private int ssl_cert_verify; + private Integer ssl_cert_verify; private int ssl_con_latency_ms; private String ssl_version; private String ssl_sni; @@ -104,7 +104,6 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { private String streaming_media_url; private String streaming_media_protocol; - public String getHttp_url() { return http_url; } @@ -353,19 +352,19 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { this.dns_message_id = dns_message_id; } - public int getDns_qr() { + public Integer getDns_qr() { return dns_qr; } - public void setDns_qr(int dns_qr) { + public void setDns_qr(Integer dns_qr) { this.dns_qr = dns_qr; } - public int getDns_opcode() { + public Integer getDns_opcode() { return dns_opcode; } - public void setDns_opcode(int dns_opcode) { + public void setDns_opcode(Integer dns_opcode) { this.dns_opcode = dns_opcode; } @@ -489,19 +488,19 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { this.dns_rr = dns_rr; } - public int getSsl_pinningst() { + public Integer getSsl_pinningst() { return ssl_pinningst; } - public void setSsl_pinningst(int ssl_pinningst) { + public void setSsl_pinningst(Integer ssl_pinningst) { this.ssl_pinningst = ssl_pinningst; } - public int getSsl_intercept_state() { + public Integer getSsl_intercept_state() { return ssl_intercept_state; } - public void setSsl_intercept_state(int ssl_intercept_state) { + public void setSsl_intercept_state(Integer ssl_intercept_state) { this.ssl_intercept_state = ssl_intercept_state; } @@ -521,11 +520,11 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { this.ssl_client_side_latency = ssl_client_side_latency; } - public int getSsl_cert_verify() { + public Integer getSsl_cert_verify() { return ssl_cert_verify; } - public void setSsl_cert_verify(int ssl_cert_verify) { + public void setSsl_cert_verify(Integer ssl_cert_verify) { this.ssl_cert_verify = ssl_cert_verify; } @@ -665,7 +664,6 @@ public class SecurityPolicyLog extends PublicSessionRecordLog { this.voip_called_number = voip_called_number; } - public String getStreaming_media_url() { return streaming_media_url; } diff --git a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java index c5c5090..d4eda58 100644 --- a/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java +++ b/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java @@ -68,17 +68,14 @@ public class LogFlowWriteTopology { builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt"); break; - case "RADIUS-RECORD-LOG": builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt"); break; - case "CONNECTION-RECORD-LOG": builder.setBolt("CollectCompletedBolt", new CollectCompletedBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("CollectCompletedBolt"); break; - case "SECURITY-EVENT-LOG": builder.setBolt("SecurityCompletionBolt", new SecurityCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt"); diff --git a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java index 068f619..2b60e43 100644 --- a/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java +++ b/src/main/java/cn/ac/iie/utils/hbase/HBaseUtils.java @@ -15,7 +15,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; /** * HBase 工具类 @@ -25,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; public class HBaseUtils { private final static Logger logger = Logger.getLogger(HBaseUtils.class); - private static Map subIdMap = new ConcurrentHashMap<>(333334); + private static Map subIdMap = new HashMap<>(333334); +// private static Map subIdMap = new ConcurrentSkipListMap<>(); private static Connection connection; private static Long time; diff --git a/src/test/java/cn/ac/iie/test/a.json b/src/test/java/cn/ac/iie/test/a.json index e70c1b8..e80ed52 100644 --- a/src/test/java/cn/ac/iie/test/a.json +++ b/src/test/java/cn/ac/iie/test/a.json @@ -1,85 +1 @@ -{ - "bgp_type": 0, - "common_action": 16, - "common_address_list": "", - "common_address_type": 4, - "common_app_id": 0, - "common_app_label": "", - "common_c2s_byte_num": 639, - "common_c2s_pkt_num": 1, - "common_client_asn": "36351", - "common_client_ip": "75.126.99.155", - "common_client_location": "Dallas\tTexas\tUnited States", - "common_client_port": 40846, - "common_con_duration_ms": 113814, - "common_device_id": "4586496", - "common_direction": 1, - "common_encapsulation": 8, - "common_end_time": 1574842412, - "common_entrance_id": 8, - "common_has_dup_traffic": 1, - "common_isp": "China Telecom", - "common_l4_protocol": "MPLS", - "common_link_id": 1, - "common_log_id": 172027081238036520, - "common_policy_id": 902, - "common_protocol_id": 0, - "common_recv_time": 1574842413, - "common_s2c_byte_num": 1360, - "common_s2c_pkt_num": 26, - "common_schema_type": "BGP", - "common_server_asn": "9050", - "common_server_ip": "92.85.69.150", - "common_server_location": "Romania", - "common_server_port": 53, - "common_service": 8, - "common_sled_ip": "192.168.10.58", - "common_start_time": 1574842361, - "common_stream_dir": 1, - "common_stream_error": "", - "common_stream_trace_id": 0, - "common_subscriber_id": "zareP", - "common_user_region": "973ebGTTwdBhecbqI9U724LJdyHWV3BOUIcy4jgtpd221GV2QSOLMZc2awba3GfqKCiQxfirv5NjptRbawXDIpw4pJ0Xg4WZJSKW", - "common_user_tags": "qeT9tif1iRp1qCq6pauMO0RqsV13ktQm4Jlp4ZBOFeaQufoJMbC5tQ70ebDI1F9Ffw8c580e9yd27v96M6i4CPN8mEDw1mIkMexT", - "dns_aa": 0, - "dns_ancount": 0, - "dns_arcount": 0, - "dns_message_id": 0, - "dns_nscount": 0, - "dns_opcode": 0, - "dns_qclass": 0, - "dns_qdcount": 0, - "dns_qr": 0, - "dns_qtype": 0, - "dns_ra": 0, - "dns_rcode": 0, - "dns_rd": 0, - "dns_sub": 0, - "dns_tc": 0, - "http_content_length": "48895", - "http_content_type": "application/x-jpg", - "http_domain": "zhiyin.cn", - "http_host": "v.zhiyin.cn97991", - "http_proxy_flag": 1, - "http_referer": "", - "http_request_body": "", - "http_request_body_key": "", - "http_request_header": "", - "http_request_line": "", - "http_response_body": "", - "http_response_body_key": "", - "http_response_header": "", - "http_response_line": "", - "http_sequence": 6, - "http_set_cookie": "", - "http_snapshot": "", - "http_url": "http://v.zhiyin.cn/watch/295.html1661741", - "http_user_agent": "", - "http_version": "http1", - "ssl_cert_verify": 0, - "ssl_client_side_latency": 0, - "ssl_con_latency_ms": 0, - "ssl_intercept_state": 0, - "ssl_pinningst": 0, - "ssl_server_side_latency": 0 -} +{"common_stream_dir":3,"common_address_type":4,"common_client_ip":"82.200.242.225","common_server_ip":"82.200.242.69","common_client_port":59387,"common_server_port":1812,"common_c2s_pkt_num":2,"common_s2c_pkt_num":1,"common_c2s_byte_num":507,"common_s2c_byte_num":151,"common_start_time":1575534194,"common_end_time":1575534195,"common_con_duration_ms":1000,"common_stream_trace_id":0,"common_l4_protocol":"IPv4_UDP","common_address_list":"59387-1812-82.200.242.225-82.200.242.69","radius_packet_type":1,"radius_account":"Kuanysh79143","radius_service_type":2,"radius_acct_session_id":"473332153","radius_framed_ip":"82.200.242.225","common_policy_id":0,"common_service":162,"common_entrance_id":0,"common_direction":0,"common_device_id":0,"common_encapsulation":14,"common_link_id":0,"common_sled_ip":"192.168.40.119","common_schema_type":"RADIUS"} \ No newline at end of file diff --git a/src/test/java/cn/ac/iie/test/bean/Student.java b/src/test/java/cn/ac/iie/test/bean/Student.java new file mode 100644 index 0000000..3383bc0 --- /dev/null +++ b/src/test/java/cn/ac/iie/test/bean/Student.java @@ -0,0 +1,22 @@ +package cn.ac.iie.test.bean; + +public class Student { + private String name; + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java index 44f176a..a6fb619 100644 --- a/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java +++ b/src/test/java/cn/ac/iie/test/hbase/HBaseTest.java @@ -1,10 +1,7 @@ package cn.ac.iie.test.hbase; import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.hbase.HBaseUtils; import cn.ac.iie.utils.system.IpUtils; -import com.zdjizhi.utils.StringUtil; -import io.netty.util.collection.IntObjectHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -13,19 +10,18 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; -import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; public class HBaseTest { private final static Logger logger = Logger.getLogger(HBaseTest.class); // private static Map subIdMap = new ConcurrentHashMap(13333334); private static Map subIdMap = new HashMap<>(13333334); + private static Map testMap = new ConcurrentSkipListMap<>(); private static Connection connection; private static Long time; @@ -35,7 +31,8 @@ public class HBaseTest { Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 // configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); - configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181"); +// configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181"); + configuration.set("hbase.zookeeper.quorum", "192.168.40.224:2182"); configuration.set("hbase.client.retries.number", "3"); configuration.set("hbase.bulkload.retries.number", "3"); configuration.set("zookeeper.recovery.retry", "3"); @@ -50,9 +47,9 @@ public class HBaseTest { @Test public void change() { -// Long begin = System.currentTimeMillis(); -// getAll(); -// System.out.println(System.currentTimeMillis() - begin); + Long begin = System.currentTimeMillis(); + getAll(); + System.out.println(System.currentTimeMillis() - begin); } /** @@ -118,10 +115,12 @@ public class HBaseTest { Cell[] cells = result.rawCells(); for (Cell cell : cells) { // subIdMap.put(Integer.valueOf(Bytes.toString(CellUtil.cloneRow(cell))), Bytes.toString(CellUtil.cloneValue(cell))); - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); +// subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + testMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } - logger.warn("获取全量后集合长度:" + subIdMap.size()); +// logger.warn("获取全量后集合长度:" + subIdMap.size()); + logger.warn("获取全量后集合长度:" + testMap.size()); logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin)); scanner.close(); } catch (IOException e) { diff --git a/src/test/java/cn/ac/iie/test/test.java b/src/test/java/cn/ac/iie/test/test.java index 7876fa2..aa5b7db 100644 --- a/src/test/java/cn/ac/iie/test/test.java +++ b/src/test/java/cn/ac/iie/test/test.java @@ -1,29 +1,16 @@ package cn.ac.iie.test; -import cn.ac.iie.common.FlowWriteConfig; -import cn.ac.iie.utils.general.TransFormUtils; -import com.alibaba.fastjson.JSON; +import cn.ac.iie.test.bean.Student; import com.alibaba.fastjson.JSONObject; import org.apache.log4j.Logger; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; public class test { private static Logger logger = Logger.getLogger(test.class); + public static void main(String[] args) { - String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\",\"uid\":\"0\"}"; -// SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class); -// System.out.println(JSONObject.toJSONString(sessionRecordLog)); - JSONObject obj = JSONObject.parseObject(message); - obj.put("abc","bca"); - System.out.println(obj.toString()); - + String message = "{\"name\":\"aaa\"}"; + Student student = JSONObject.parseObject(message, Student.class); + System.out.println(JSONObject.toJSONString(student)); } - @Test - public void test2() { - logger.error("{} 日志解析过程出现异常" + FlowWriteConfig.KAFKA_TOPIC); - } }