Merge branch 'tsg_galaxy_com_v3.0.2019115' of https://git.mesalab.cn/bigdata/tsg/log-stream-completion into tsg_galaxy_com_v3.0.2019115

This commit is contained in:
李玺康
2019-11-12 16:21:44 +08:00
14 changed files with 112 additions and 89 deletions

View File

@@ -17,7 +17,7 @@ auto.offset.reset=latest
kafka.topic=SECURITY-POLICY-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=session-completion-program
group.id=security-policy-191112
#输出topic
results.output.topic=SECURITY-POLICY-COMPLETED-LOG
@@ -35,16 +35,20 @@ datacenter.bolt.parallelism=1
kafka.bolt.parallelism=1
#定位库地址
#ip.library=/home/ceiec/topology/dat/
ip.library=D:/dat/
ip.library=/home/ceiec/topology/dat/
#ip.library=D:/dat/
#kafka批量条数
batch.insert.num=5000
batch.insert.num=2000
#数据中心UID
data.center.id.num=10
data.center.id.num=15
#tick时钟频率
topology.tick.tuple.freq.secs=60
topology.tick.tuple.freq.secs=5
#hbase 更新时间
hbase.tick.tuple.freq.secs=60
#当bolt性能受限时限制spout接收速度理论看ack开启才有效
topology.config.max.spout.pending=150000

View File

@@ -1,6 +1,11 @@
package cn.ac.iie.bean;
public class CollectProtocolRecordLog extends PublicSessionRecordLog{
/**
* 除radius之外
*
* @author qidaijie
*/
public class CollectProtocolRecordLog extends PublicSessionRecordLog {
//TODO HTTP协议属性 21
@@ -77,23 +82,23 @@ public class CollectProtocolRecordLog extends PublicSessionRecordLog{
private String ssl_client_side_version;
private String ssl_error;
//TODO FTP协议属性 2
//TODO FTP协议属性 2
private String ftp_url;
private String ftp_content;
//TODO BGP协议属性 3
//TODO BGP协议属性 3
private int bgp_type;
private String bgp_as_num;
private String bgp_route;
//TODO VOIP协议属性 4
//TODO VOIP协议属性 4
private String voip_calling_account;
private String voip_called_account;
private String voip_calling_number;
private String voip_called_number;
//TODO STREAMING_MEDIA协议属性 2
//TODO STREAMING_MEDIA协议属性 2
private String streaming_media_url;
private String streaming_media_protocol;

View File

@@ -1,6 +1,10 @@
package cn.ac.iie.bean;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter;
/**
* @author qidaijie
*/
public class ProxySessionRecordLog extends PublicSessionRecordLog{

View File

@@ -1,11 +1,15 @@
package cn.ac.iie.bean;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter;
/**
* 公共类
*
* @author qidaijie
*/
public class PublicSessionRecordLog {
//TODO 基础属性 40 int22
private int common_service;
private int common_direction;
private long common_recv_time;

View File

@@ -1,7 +1,12 @@
package cn.ac.iie.bean;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.support.spring.annotation.FastJsonFilter;
public class RadiusSessionRecordLog extends PublicSessionRecordLog{
/**
* Radius 日志
*
* @author qidaijie
*/
public class RadiusSessionRecordLog extends PublicSessionRecordLog {
//TODO RADIUS协议属性 4

View File

@@ -1,6 +1,11 @@
package cn.ac.iie.bean;
public class SecurityPolicyLog extends PublicSessionRecordLog{
/**
* 策略
*
* @author qidaijie
*/
public class SecurityPolicyLog extends PublicSessionRecordLog {
//TODO HTTP协议属性 21
@@ -77,23 +82,23 @@ public class SecurityPolicyLog extends PublicSessionRecordLog{
private String ssl_client_side_version;
private String ssl_error;
//TODO FTP协议属性 2
//TODO FTP协议属性 2
private String ftp_url;
private String ftp_content;
//TODO BGP协议属性 3
//TODO BGP协议属性 3
private int bgp_type;
private String bgp_as_num;
private String bgp_route;
//TODO VOIP协议属性 4
//TODO VOIP协议属性 4
private String voip_calling_account;
private String voip_called_account;
private String voip_calling_number;
private String voip_called_number;
//TODO STREAMING_MEDIA协议属性 2
//TODO STREAMING_MEDIA协议属性 2
private String streaming_media_url;
private String streaming_media_protocol;

View File

@@ -24,11 +24,11 @@ import java.util.Map;
* @date 2018/8/14
*/
public class NtcLogSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3940515789830317517L;
private static final long serialVersionUID = -3663610927224396615L;
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
private List<String> list;
private KafkaLogNtc kafkaLogNtc;
private static long successfulSum = 0;
// private static long successfulSum = 0;
@Override
@@ -43,11 +43,11 @@ public class NtcLogSendBolt extends BaseBasicBolt {
if (TupleUtils.isTick(tuple)) {
if (list.size() != 0) {
kafkaLogNtc.sendMessage(list);
successfulSum += list.size();
// successfulSum += list.size();
list.clear();
}
basicOutputCollector.emit(new Values(successfulSum));
successfulSum = 0L;
// basicOutputCollector.emit(new Values(successfulSum));
// successfulSum = 0L;
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
@@ -55,12 +55,12 @@ public class NtcLogSendBolt extends BaseBasicBolt {
}
if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
kafkaLogNtc.sendMessage(list);
successfulSum += list.size();
// successfulSum += list.size();
list.clear();
}
}
} catch (Exception e) {
logger.error("日志发送Kafka过程出现异常");
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
e.printStackTrace();
}
}
@@ -74,7 +74,7 @@ public class NtcLogSendBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("suc"));
// outputFieldsDeclarer.declare(new Fields("suc"));
}
}

View File

@@ -1,4 +1,4 @@
package cn.ac.iie.bolt.collectProtocol;
package cn.ac.iie.bolt.collectprotocol;
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.utils.system.TupleUtils;
@@ -23,10 +23,10 @@ import static cn.ac.iie.utils.hbase.HbaseUtils.change;
*
* @author qidaijie
*/
@SuppressWarnings("all")
public class CollectCompletedBolt extends BaseBasicBolt {
private static final long serialVersionUID = -105915167013846589L;
private final static Logger logger = Logger.getLogger(CollectCompletedBolt.class);
private static final long serialVersionUID = 4682827168247333522L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
@@ -38,14 +38,15 @@ public class CollectCompletedBolt extends BaseBasicBolt {
try {
if (TupleUtils.isTick(tuple)) {
change();
}else {
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(getCollectProtocolMessage(message)));
}
}
} catch (Exception e) {
logger.error("接收解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@@ -53,7 +54,7 @@ public class CollectCompletedBolt extends BaseBasicBolt {
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}

View File

@@ -23,10 +23,10 @@ import static cn.ac.iie.utils.hbase.HbaseUtils.change;
*
* @author qidaijie
*/
@SuppressWarnings("all")
public class ProxyCompletionBolt extends BaseBasicBolt {
private static final long serialVersionUID = -1059151670838465894L;
private final static Logger logger = Logger.getLogger(ProxyCompletionBolt.class);
private static final long serialVersionUID = 6097654428594885032L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
@@ -38,14 +38,15 @@ public class ProxyCompletionBolt extends BaseBasicBolt {
try {
if (TupleUtils.isTick(tuple)) {
change();
}else {
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(getProxyMessage(message)));
}
}
} catch (Exception e) {
logger.error("接收解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@@ -53,7 +54,7 @@ public class ProxyCompletionBolt extends BaseBasicBolt {
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}

View File

@@ -1,5 +1,6 @@
package cn.ac.iie.bolt.radius;
import cn.ac.iie.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
@@ -20,8 +21,9 @@ import static cn.ac.iie.utils.general.TransFormUtils.getRadiusMessage;
* @author qidaijie
*/
public class RadiusCompletionBolt extends BaseBasicBolt {
private static final long serialVersionUID = -3657802387129063952L;
private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class);
private static final long serialVersionUID = -3657802387129063952L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
@@ -36,7 +38,8 @@ public class RadiusCompletionBolt extends BaseBasicBolt {
basicOutputCollector.emit(new Values(getRadiusMessage(message)));
}
} catch (Exception e) {
logger.error("接收解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}

View File

@@ -23,10 +23,11 @@ import static cn.ac.iie.utils.general.TransFormUtils.getSecurityMessage;
*
* @author qidaijie
*/
@SuppressWarnings("all")
public class SecurityCompletionBolt extends BaseBasicBolt {
private static final long serialVersionUID = -1059151770138464874L;
private final static Logger logger = Logger.getLogger(SecurityCompletionBolt.class);
private static final long serialVersionUID = -2380858260054733989L;
@Override
public void prepare(Map stormConf, TopologyContext context) {
@@ -38,14 +39,15 @@ public class SecurityCompletionBolt extends BaseBasicBolt {
try {
if (TupleUtils.isTick(tuple)) {
HbaseUtils.change();
}else {
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
basicOutputCollector.emit(new Values(getSecurityMessage(message)));
}
}
} catch (Exception e) {
logger.error("接收解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
e.printStackTrace();
}
}
@@ -53,7 +55,7 @@ public class SecurityCompletionBolt extends BaseBasicBolt {
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
return conf;
}

View File

@@ -20,6 +20,7 @@ public class FlowWriteConfig {
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");

View File

@@ -1,9 +1,8 @@
package cn.ac.iie.topology;
import cn.ac.iie.bean.CollectProtocolRecordLog;
import cn.ac.iie.bolt.NtcLogSendBolt;
import cn.ac.iie.bolt.collectProtocol.CollectCompletedBolt;
import cn.ac.iie.bolt.collectprotocol.CollectCompletedBolt;
import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
import cn.ac.iie.bolt.security.SecurityCompletionBolt;
@@ -64,9 +63,7 @@ public class LogFlowWriteTopology {
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
String topic_name = FlowWriteConfig.KAFKA_TOPIC;
switch(topic_name){
switch (FlowWriteConfig.KAFKA_TOPIC) {
case "PROXY-POLICY-LOG":
builder.setBolt("ProxyCompletionBolt", new ProxyCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ProxyCompletionBolt");
@@ -87,22 +84,11 @@ public class LogFlowWriteTopology {
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("SecurityCompletionBolt");
break;
}
/*if ("PROXY_POLICY_LOG".equals(FlowWriteConfig.KAFKA_TOPIC) || "COLLECT_HTTP_META_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
builder.setBolt("HttpCompletionBolt", new HttpCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("HttpCompletionBolt");
} else if ("COLLECT_RADIUS_RECORD_LOG".equals(FlowWriteConfig.KAFKA_TOPIC)) {
builder.setBolt("RadiusCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("RadiusCompletionBolt");
} else {
builder.setBolt("ConnCompletionBolt", new ConnCompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("ConnCompletionBolt");
}*/
// builder.setBolt("SummaryBolt", new SummaryBolt(), 1).localOrShuffleGrouping("NtcLogSendBolt");
default:
}
}
public static void main(String[] args) throws Exception {
LogFlowWriteTopology csst = null;
boolean runLocally = true;

View File

@@ -19,7 +19,7 @@ import java.util.regex.Pattern;
* @author qidaijie
* @create 2018-08-13 15:11
*/
@SuppressWarnings("all")
public class TransFormUtils {
private static Logger logger = Logger.getLogger(TransFormUtils.class);
private static Pattern WEB_PATTERN = Pattern.compile("[^\\\\.]+(\\.com\\.cn|\\.net\\.cn|\\.org\\.cn|\\.gov\\.cn|\\.com|\\.net|\\.cn|\\.org|\\.cc|\\.me|\\.tel|\\.mobi|\\.asia|\\.biz|\\.info|\\.name|\\.tv|\\.hk|\\.公司|\\.中国|\\.网络)");
@@ -34,6 +34,7 @@ public class TransFormUtils {
/**
* 解析日志,并补全
* 补subscriber_id不补domain
*
* @param message radius原始日志
* @return 补全后的日志
*/
@@ -50,19 +51,17 @@ public class TransFormUtils {
radiusSessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true));
return JSONObject.toJSONString(radiusSessionRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
e.printStackTrace();
return "";
}
}
/**
* 解析日志,并补全
* 补domain,补subscriber_id
*
* @param message Proxy原始日志
* @return 补全后的日志
*/
@@ -83,16 +82,17 @@ public class TransFormUtils {
proxySessionRecordLog.setHttp_domain(getTopDomain(null, proxySessionRecordLog.getHttp_host()));
return JSONObject.toJSONString(proxySessionRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
e.printStackTrace();
return "";
}
}
/**
* 解析日志,并补全
* 补domain,补subscriber_id
*
* @param message Security原始日志
* @return 补全后的日志
*/
@@ -113,7 +113,8 @@ public class TransFormUtils {
securitySessionRecordLog.setHttp_domain(getTopDomain(securitySessionRecordLog.getSsl_sni(), securitySessionRecordLog.getHttp_host()));
return JSONObject.toJSONString(securitySessionRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常");
e.printStackTrace();
return "";
}
}
@@ -121,25 +122,26 @@ public class TransFormUtils {
/**
* 解析日志,并补全
* 补domain,补subscriber_id
*
* @param message Security原始日志
* @return 补全后的日志
*/
public static String getCollectProtocolMessage(String message) {
CollectProtocolRecordLog securitySessionRecordLog = JSONObject.parseObject(message, CollectProtocolRecordLog.class);
String serverIp = securitySessionRecordLog.getCommon_server_ip();
String clientIp = securitySessionRecordLog.getCommon_client_ip();
CollectProtocolRecordLog collectProtocolRecordLog = JSONObject.parseObject(message, CollectProtocolRecordLog.class);
String serverIp = collectProtocolRecordLog.getCommon_server_ip();
String clientIp = collectProtocolRecordLog.getCommon_client_ip();
try {
securitySessionRecordLog.setCommon_log_id(SnowflakeId.generateId());
securitySessionRecordLog.setCommon_recv_time((System.currentTimeMillis() / 1000));
securitySessionRecordLog.setCommon_server_location(ipLookup.countryLookup(serverIp));
securitySessionRecordLog.setCommon_client_location(ipLookup.cityLookupDetail(clientIp));
securitySessionRecordLog.setCommon_client_asn(ipLookup.asnLookup(clientIp, true));
securitySessionRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true));
collectProtocolRecordLog.setCommon_log_id(SnowflakeId.generateId());
collectProtocolRecordLog.setCommon_recv_time((System.currentTimeMillis() / 1000));
collectProtocolRecordLog.setCommon_server_location(ipLookup.countryLookup(serverIp));
collectProtocolRecordLog.setCommon_client_location(ipLookup.cityLookupDetail(clientIp));
collectProtocolRecordLog.setCommon_client_asn(ipLookup.asnLookup(clientIp, true));
collectProtocolRecordLog.setCommon_server_asn(ipLookup.asnLookup(serverIp, true));
//TODO 集成AAA数据subscribe_id -数据端补全
securitySessionRecordLog.setCommon_subscriber_id(HbaseUtils.getAccount(clientIp));
collectProtocolRecordLog.setCommon_subscriber_id(HbaseUtils.getAccount(clientIp));
// securitySessionRecordLog.setCommon_subscriber_id("aaaaa");
securitySessionRecordLog.setHttp_domain(getTopDomain(securitySessionRecordLog.getSsl_sni(), securitySessionRecordLog.getHttp_host()));
return JSONObject.toJSONString(securitySessionRecordLog);
collectProtocolRecordLog.setHttp_domain(getTopDomain(collectProtocolRecordLog.getSsl_sni(), collectProtocolRecordLog.getHttp_host()));
return JSONObject.toJSONString(collectProtocolRecordLog);
} catch (Exception e) {
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志解析过程出现异常", e);
return "";