替换某些字段类型为Integer类型,防止int类型为0影响本身语义

This commit is contained in:
qidaijie
2019-12-06 18:50:31 +08:00
parent 7aac9e03d5
commit fb22d60bfe
10 changed files with 114 additions and 189 deletions

View File

@@ -1,12 +1,13 @@
#管理kafka地址
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
bootstrap.servers=192.168.40.186:9092
#zookeeper 地址
zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase zookeeper地址
hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
#hbase.zookeeper.servers=192.168.40.224:2182
#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
hbase.zookeeper.servers=192.168.40.224:2182
#hbase tablename
hbase.table.name=subscriber_info
@@ -15,13 +16,13 @@ hbase.table.name=subscriber_info
auto.offset.reset=latest
#kafka broker下的topic名称
kafka.topic=SECURITY-EVENT-LOG
kafka.topic=CONNECTION-RECORD-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=security-policy-191114
#输出topic
results.output.topic=SECURITY-EVENT-COMPLETED-LOG
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
#storm topology workers
topology.workers=3
@@ -37,7 +38,7 @@ kafka.bolt.parallelism=12
#定位库地址
ip.library=/home/ceiec/topology/dat/
#ip.library=D:/dat/
#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\
#kafka批量条数
batch.insert.num=2000

View File

@@ -63,13 +63,6 @@ public class PublicSessionRecordLog {
this.common_service = common_service;
}
public int getCommon_direction() {
return common_direction;
}
public void setCommon_direction(int common_direction) {
this.common_direction = common_direction;
}
public long getCommon_recv_time() {
return common_recv_time;
@@ -127,13 +120,6 @@ public class PublicSessionRecordLog {
this.common_link_id = common_link_id;
}
public int getCommon_encapsulation() {
return common_encapsulation;
}
public void setCommon_encapsulation(int common_encapsulation) {
this.common_encapsulation = common_encapsulation;
}
public int getCommon_server_port() {
return common_server_port;
@@ -223,6 +209,22 @@ public class PublicSessionRecordLog {
this.common_stream_dir = common_stream_dir;
}
public int getCommon_direction() {
return common_direction;
}
public void setCommon_direction(int common_direction) {
this.common_direction = common_direction;
}
public int getCommon_encapsulation() {
return common_encapsulation;
}
public void setCommon_encapsulation(int common_encapsulation) {
this.common_encapsulation = common_encapsulation;
}
public int getCommon_has_dup_traffic() {
return common_has_dup_traffic;
}

View File

@@ -49,8 +49,8 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
//TODO 3DNS协议属性 18
private int dns_message_id;
private int dns_qr;
private int dns_opcode;
private Integer dns_qr;
private Integer dns_opcode;
private int dns_aa;
private int dns_tc;
private int dns_rd;
@@ -70,11 +70,11 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
//TODO SSL协议属性 13
private int ssl_pinningst;
private int ssl_intercept_state;
private Integer ssl_pinningst;
private Integer ssl_intercept_state;
private int ssl_server_side_latency;
private int ssl_client_side_latency;
private int ssl_cert_verify;
private Integer ssl_cert_verify;
private int ssl_con_latency_ms;
private String ssl_version;
private String ssl_sni;
@@ -161,6 +161,23 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
this.http_response_header = http_response_header;
}
public Integer getSsl_pinningst() {
return ssl_pinningst;
}
public void setSsl_pinningst(Integer ssl_pinningst) {
this.ssl_pinningst = ssl_pinningst;
}
public Integer getSsl_intercept_state() {
return ssl_intercept_state;
}
public void setSsl_intercept_state(Integer ssl_intercept_state) {
this.ssl_intercept_state = ssl_intercept_state;
}
public String getHttp_request_body() {
return http_request_body;
}
@@ -185,6 +202,14 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
this.http_request_body_key = http_request_body_key;
}
public Integer getSsl_cert_verify() {
return ssl_cert_verify;
}
public void setSsl_cert_verify(Integer ssl_cert_verify) {
this.ssl_cert_verify = ssl_cert_verify;
}
public String getHttp_response_body_key() {
return http_response_body_key;
}
@@ -353,19 +378,19 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
this.dns_message_id = dns_message_id;
}
public int getDns_qr() {
public Integer getDns_qr() {
return dns_qr;
}
public void setDns_qr(int dns_qr) {
public void setDns_qr(Integer dns_qr) {
this.dns_qr = dns_qr;
}
public int getDns_opcode() {
public Integer getDns_opcode() {
return dns_opcode;
}
public void setDns_opcode(int dns_opcode) {
public void setDns_opcode(Integer dns_opcode) {
this.dns_opcode = dns_opcode;
}
@@ -489,21 +514,6 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
this.dns_rr = dns_rr;
}
public int getSsl_pinningst() {
return ssl_pinningst;
}
public void setSsl_pinningst(int ssl_pinningst) {
this.ssl_pinningst = ssl_pinningst;
}
public int getSsl_intercept_state() {
return ssl_intercept_state;
}
public void setSsl_intercept_state(int ssl_intercept_state) {
this.ssl_intercept_state = ssl_intercept_state;
}
public int getSsl_server_side_latency() {
return ssl_server_side_latency;
@@ -521,14 +531,6 @@ public class ConnectionRecordLog extends PublicSessionRecordLog {
this.ssl_client_side_latency = ssl_client_side_latency;
}
public int getSsl_cert_verify() {
return ssl_cert_verify;
}
public void setSsl_cert_verify(int ssl_cert_verify) {
this.ssl_cert_verify = ssl_cert_verify;
}
public int getSsl_con_latency_ms() {
return ssl_con_latency_ms;
}

View File

@@ -49,8 +49,8 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
//TODO 3DNS协议属性 18
private int dns_message_id;
private int dns_qr;
private int dns_opcode;
private Integer dns_qr;
private Integer dns_opcode;
private int dns_aa;
private int dns_tc;
private int dns_rd;
@@ -70,11 +70,11 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
//TODO SSL协议属性 13
private int ssl_pinningst;
private int ssl_intercept_state;
private Integer ssl_pinningst;
private Integer ssl_intercept_state;
private int ssl_server_side_latency;
private int ssl_client_side_latency;
private int ssl_cert_verify;
private Integer ssl_cert_verify;
private int ssl_con_latency_ms;
private String ssl_version;
private String ssl_sni;
@@ -104,7 +104,6 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
private String streaming_media_url;
private String streaming_media_protocol;
public String getHttp_url() {
return http_url;
}
@@ -353,19 +352,19 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
this.dns_message_id = dns_message_id;
}
public int getDns_qr() {
public Integer getDns_qr() {
return dns_qr;
}
public void setDns_qr(int dns_qr) {
public void setDns_qr(Integer dns_qr) {
this.dns_qr = dns_qr;
}
public int getDns_opcode() {
public Integer getDns_opcode() {
return dns_opcode;
}
public void setDns_opcode(int dns_opcode) {
public void setDns_opcode(Integer dns_opcode) {
this.dns_opcode = dns_opcode;
}
@@ -489,19 +488,19 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
this.dns_rr = dns_rr;
}
public int getSsl_pinningst() {
public Integer getSsl_pinningst() {
return ssl_pinningst;
}
public void setSsl_pinningst(int ssl_pinningst) {
public void setSsl_pinningst(Integer ssl_pinningst) {
this.ssl_pinningst = ssl_pinningst;
}
public int getSsl_intercept_state() {
public Integer getSsl_intercept_state() {
return ssl_intercept_state;
}
public void setSsl_intercept_state(int ssl_intercept_state) {
public void setSsl_intercept_state(Integer ssl_intercept_state) {
this.ssl_intercept_state = ssl_intercept_state;
}
@@ -521,11 +520,11 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
this.ssl_client_side_latency = ssl_client_side_latency;
}
public int getSsl_cert_verify() {
public Integer getSsl_cert_verify() {
return ssl_cert_verify;
}
public void setSsl_cert_verify(int ssl_cert_verify) {
public void setSsl_cert_verify(Integer ssl_cert_verify) {
this.ssl_cert_verify = ssl_cert_verify;
}
@@ -665,7 +664,6 @@ public class SecurityPolicyLog extends PublicSessionRecordLog {
this.voip_called_number = voip_called_number;
}
public String getStreaming_media_url() {
return streaming_media_url;
}

View File

@@ -68,17 +68,14 @@ public class LogFlowWriteTopology {
builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt");
break;
case "RADIUS-RECORD-LOG":
builder.setBolt("RadiusCompletionBolt", new RadiusCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
break;
case "CONNECTION-RECORD-LOG":
builder.setBolt("CollectCompletedBolt", new CollectCompletedBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("CollectCompletedBolt");
break;
case "SECURITY-EVENT-LOG":
builder.setBolt("SecurityCompletionBolt", new SecurityCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt");

View File

@@ -15,7 +15,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* HBase 工具类
@@ -25,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class HBaseUtils {
private final static Logger logger = Logger.getLogger(HBaseUtils.class);
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(333334);
private static Map<String, String> subIdMap = new HashMap<>(333334);
// private static Map<String, String> subIdMap = new ConcurrentSkipListMap<>();
private static Connection connection;
private static Long time;

View File

@@ -1,85 +1 @@
{
"bgp_type": 0,
"common_action": 16,
"common_address_list": "",
"common_address_type": 4,
"common_app_id": 0,
"common_app_label": "",
"common_c2s_byte_num": 639,
"common_c2s_pkt_num": 1,
"common_client_asn": "36351",
"common_client_ip": "75.126.99.155",
"common_client_location": "Dallas\tTexas\tUnited States",
"common_client_port": 40846,
"common_con_duration_ms": 113814,
"common_device_id": "4586496",
"common_direction": 1,
"common_encapsulation": 8,
"common_end_time": 1574842412,
"common_entrance_id": 8,
"common_has_dup_traffic": 1,
"common_isp": "China Telecom",
"common_l4_protocol": "MPLS",
"common_link_id": 1,
"common_log_id": 172027081238036520,
"common_policy_id": 902,
"common_protocol_id": 0,
"common_recv_time": 1574842413,
"common_s2c_byte_num": 1360,
"common_s2c_pkt_num": 26,
"common_schema_type": "BGP",
"common_server_asn": "9050",
"common_server_ip": "92.85.69.150",
"common_server_location": "Romania",
"common_server_port": 53,
"common_service": 8,
"common_sled_ip": "192.168.10.58",
"common_start_time": 1574842361,
"common_stream_dir": 1,
"common_stream_error": "",
"common_stream_trace_id": 0,
"common_subscriber_id": "zareP",
"common_user_region": "973ebGTTwdBhecbqI9U724LJdyHWV3BOUIcy4jgtpd221GV2QSOLMZc2awba3GfqKCiQxfirv5NjptRbawXDIpw4pJ0Xg4WZJSKW",
"common_user_tags": "qeT9tif1iRp1qCq6pauMO0RqsV13ktQm4Jlp4ZBOFeaQufoJMbC5tQ70ebDI1F9Ffw8c580e9yd27v96M6i4CPN8mEDw1mIkMexT",
"dns_aa": 0,
"dns_ancount": 0,
"dns_arcount": 0,
"dns_message_id": 0,
"dns_nscount": 0,
"dns_opcode": 0,
"dns_qclass": 0,
"dns_qdcount": 0,
"dns_qr": 0,
"dns_qtype": 0,
"dns_ra": 0,
"dns_rcode": 0,
"dns_rd": 0,
"dns_sub": 0,
"dns_tc": 0,
"http_content_length": "48895",
"http_content_type": "application/x-jpg",
"http_domain": "zhiyin.cn",
"http_host": "v.zhiyin.cn97991",
"http_proxy_flag": 1,
"http_referer": "",
"http_request_body": "",
"http_request_body_key": "",
"http_request_header": "",
"http_request_line": "",
"http_response_body": "",
"http_response_body_key": "",
"http_response_header": "",
"http_response_line": "",
"http_sequence": 6,
"http_set_cookie": "",
"http_snapshot": "",
"http_url": "http://v.zhiyin.cn/watch/295.html1661741",
"http_user_agent": "",
"http_version": "http1",
"ssl_cert_verify": 0,
"ssl_client_side_latency": 0,
"ssl_con_latency_ms": 0,
"ssl_intercept_state": 0,
"ssl_pinningst": 0,
"ssl_server_side_latency": 0
}
{"common_stream_dir":3,"common_address_type":4,"common_client_ip":"82.200.242.225","common_server_ip":"82.200.242.69","common_client_port":59387,"common_server_port":1812,"common_c2s_pkt_num":2,"common_s2c_pkt_num":1,"common_c2s_byte_num":507,"common_s2c_byte_num":151,"common_start_time":1575534194,"common_end_time":1575534195,"common_con_duration_ms":1000,"common_stream_trace_id":0,"common_l4_protocol":"IPv4_UDP","common_address_list":"59387-1812-82.200.242.225-82.200.242.69","radius_packet_type":1,"radius_account":"Kuanysh79143","radius_service_type":2,"radius_acct_session_id":"473332153","radius_framed_ip":"82.200.242.225","common_policy_id":0,"common_service":162,"common_entrance_id":0,"common_direction":0,"common_device_id":0,"common_encapsulation":14,"common_link_id":0,"common_sled_ip":"192.168.40.119","common_schema_type":"RADIUS"}

View File

@@ -0,0 +1,22 @@
package cn.ac.iie.test.bean;
public class Student {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}

View File

@@ -1,10 +1,7 @@
package cn.ac.iie.test.hbase;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.hbase.HBaseUtils;
import cn.ac.iie.utils.system.IpUtils;
import com.zdjizhi.utils.StringUtil;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -13,19 +10,18 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class HBaseTest {
private final static Logger logger = Logger.getLogger(HBaseTest.class);
// private static Map<Integer, String> subIdMap = new ConcurrentHashMap<Integer, String>(13333334);
private static Map<String, String> subIdMap = new HashMap<>(13333334);
private static Map<String, String> testMap = new ConcurrentSkipListMap<>();
private static Connection connection;
private static Long time;
@@ -35,7 +31,8 @@ public class HBaseTest {
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
// configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181");
// configuration.set("hbase.zookeeper.quorum", "192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181");
configuration.set("hbase.zookeeper.quorum", "192.168.40.224:2182");
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
@@ -50,9 +47,9 @@ public class HBaseTest {
@Test
public void change() {
// Long begin = System.currentTimeMillis();
// getAll();
// System.out.println(System.currentTimeMillis() - begin);
Long begin = System.currentTimeMillis();
getAll();
System.out.println(System.currentTimeMillis() - begin);
}
/**
@@ -118,10 +115,12 @@ public class HBaseTest {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
// subIdMap.put(Integer.valueOf(Bytes.toString(CellUtil.cloneRow(cell))), Bytes.toString(CellUtil.cloneValue(cell)));
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
// subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
testMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
logger.warn("获取全量后集合长度:" + subIdMap.size());
// logger.warn("获取全量后集合长度:" + subIdMap.size());
logger.warn("获取全量后集合长度:" + testMap.size());
logger.warn("获取全量耗时:" + (System.currentTimeMillis() - begin));
scanner.close();
} catch (IOException e) {

View File

@@ -1,29 +1,16 @@
package cn.ac.iie.test;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.general.TransFormUtils;
import com.alibaba.fastjson.JSON;
import cn.ac.iie.test.bean.Student;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class test {
private static Logger logger = Logger.getLogger(test.class);
public static void main(String[] args) {
String message = "{\"str_ea_m-t-r-a-ceid\":\"JSON\",\"uid\":\"0\"}";
// SessionRecordLog sessionRecordLog = JSONObject.parseObject(message, SessionRecordLog.class);
// System.out.println(JSONObject.toJSONString(sessionRecordLog));
JSONObject obj = JSONObject.parseObject(message);
obj.put("abc","bca");
System.out.println(obj.toString());
String message = "{\"name\":\"aaa\"}";
Student student = JSONObject.parseObject(message, Student.class);
System.out.println(JSONObject.toJSONString(student));
}
@Test
public void test2() {
logger.error("{} 日志解析过程出现异常" + FlowWriteConfig.KAFKA_TOPIC);
}
}