修改处理逻辑,去掉处理机IP与数据中心作为key的判定条件。

This commit is contained in:
wanglihui
2021-08-16 18:24:13 +08:00
parent e0de04886b
commit e89e1b08c9
13 changed files with 180 additions and 175 deletions

View File

@@ -29,7 +29,7 @@ public class CommonConfig {
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism"); public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism"); public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");
public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness");
public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time"); public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time");

View File

@@ -5,8 +5,6 @@ import java.io.Serializable;
public class DosMetricsLog implements Serializable { public class DosMetricsLog implements Serializable {
private long sketch_start_time; private long sketch_start_time;
private String common_sled_ip;
private String common_data_center;
private String attack_type; private String attack_type;
private String destination_ip; private String destination_ip;
private long session_rate; private long session_rate;
@@ -30,22 +28,6 @@ public class DosMetricsLog implements Serializable {
this.sketch_start_time = sketch_start_time; this.sketch_start_time = sketch_start_time;
} }
public String getCommon_sled_ip() {
return common_sled_ip;
}
public void setCommon_sled_ip(String common_sled_ip) {
this.common_sled_ip = common_sled_ip;
}
public String getCommon_data_center() {
return common_data_center;
}
public void setCommon_data_center(String common_data_center) {
this.common_data_center = common_data_center;
}
public String getAttack_type() { public String getAttack_type() {
return attack_type; return attack_type;
} }
@@ -90,8 +72,6 @@ public class DosMetricsLog implements Serializable {
public String toString() { public String toString() {
return "DosMetricsLog{" + return "DosMetricsLog{" +
"sketch_start_time=" + sketch_start_time + "sketch_start_time=" + sketch_start_time +
", common_sled_ip='" + common_sled_ip + '\'' +
", common_data_center='" + common_data_center + '\'' +
", attack_type='" + attack_type + '\'' + ", attack_type='" + attack_type + '\'' +
", destination_ip='" + destination_ip + '\'' + ", destination_ip='" + destination_ip + '\'' +
", session_rate=" + session_rate + ", session_rate=" + session_rate +

View File

@@ -3,47 +3,37 @@ package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.HbaseUtils;
import com.zdjizhi.utils.IpUtils; import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId; import com.zdjizhi.utils.SnowflakeId;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.text.ParseException; import java.text.ParseException;
import java.util.*; import java.util.*;
/**
* @author wlh
*/
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> { public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private Connection conn = null; private static Map<String, Map<String,List<Integer>>> baselineMap;
private Table table = null;
private Scan scan = null;
private Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
private final static int BASELINE_SIZE = 144; private final static int BASELINE_SIZE = 144;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters){
readFromHbase(); baselineMap = HbaseUtils.baselineMap;
PERCENT_INSTANCE.setMinimumFractionDigits(2); PERCENT_INSTANCE.setMinimumFractionDigits(2);
} }
@Override @Override
public DosEventLog map(DosSketchLog value) throws Exception { public DosEventLog map(DosSketchLog value){
try { try {
String destinationIp = value.getDestination_ip(); String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type(); String attackType = value.getAttack_type();
@@ -77,61 +67,6 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return null; return null;
} }
private void prepareHbaseEnv() throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
private void readFromHbase() throws IOException {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_num");
ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_num");
ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_num");
ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_num");
floodTypeMap.put("TCP SYN Flood",tcp);
floodTypeMap.put("UDP Flood",udp);
floodTypeMap.put("ICMP Flood",icmp);
floodTypeMap.put("DNS Amplification",dns);
baselineMap.put(rowkey,floodTypeMap);
}
logger.info("格式化baseline数据成功读取IP共{}",baselineMap.size());
}
private static ArrayList<Integer> getArraylist(Result result,String family,String qualifier) throws IOException {
if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){
return null;
}
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
}
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = writable.get();
ArrayList<Integer> list = new ArrayList<>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
private DosEventLog getResult(DosSketchLog value, Severity severity, String percent){ private DosEventLog getResult(DosSketchLog value, Severity severity, String percent){
DosEventLog dosEventLog = new DosEventLog(); DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setLog_id(SnowflakeId.generateId());

View File

@@ -3,7 +3,7 @@ package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.common.DosSketchLog;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -18,11 +18,11 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
/** /**
* @author 94976 * @author 94976
*/ */
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple4<String,String,String,String>, TimeWindow> { public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
@Override @Override
public void process(Tuple4<String,String, String, String> keys, public void process(Tuple2<String, String> keys,
Context context, Iterable<DosSketchLog> elements, Context context, Iterable<DosSketchLog> elements,
Collector<DosSketchLog> out) { Collector<DosSketchLog> out) {
DosSketchLog middleResult = getMiddleResult(keys, elements); DosSketchLog middleResult = getMiddleResult(keys, elements);
@@ -37,16 +37,14 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
} }
} }
private DosSketchLog getMiddleResult(Tuple4<String,String, String, String> keys,Iterable<DosSketchLog> elements){ private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
DosSketchLog midResuleLog = new DosSketchLog(); DosSketchLog midResuleLog = new DosSketchLog();
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements); Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
try { try {
if (values != null){ if (values != null){
midResuleLog.setCommon_sled_ip(keys.f0); midResuleLog.setAttack_type(keys.f0);
midResuleLog.setCommon_data_center(keys.f1); midResuleLog.setDestination_ip(keys.f1);
midResuleLog.setDestination_ip(keys.f3);
midResuleLog.setAttack_type(keys.f2);
midResuleLog.setSketch_start_time(values.f4); midResuleLog.setSketch_start_time(values.f4);
midResuleLog.setSketch_duration(values.f5); midResuleLog.setSketch_duration(values.f5);
midResuleLog.setSource_ip(values.f3); midResuleLog.setSource_ip(values.f3);

View File

@@ -17,6 +17,9 @@ import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
/**
* @author wlh
*/
public class ParseSketchLog { public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
@@ -37,20 +40,16 @@ public class ParseSketchLog {
private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> { private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
@Override @Override
public void flatMap(String s, Collector<DosSketchLog> collector) throws Exception { public void flatMap(String s, Collector<DosSketchLog> collector) {
try { try {
if (StringUtil.isNotBlank(s)){ if (StringUtil.isNotBlank(s)){
HashMap<String, Object> sketchSource = (HashMap<String, Object>) JsonMapper.fromJsonString(s, Object.class); HashMap<String, Object> sketchSource = (HashMap<String, Object>) JsonMapper.fromJsonString(s, Object.class);
String commonSledIp = sketchSource.get("common_sled_ip").toString();
String commonDataCenter = sketchSource.get("common_data_center").toString();
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString(); String attackType = sketchSource.get("attack_type").toString();
ArrayList<HashMap<String, Object>> reportIpList = (ArrayList<HashMap<String, Object>>) sketchSource.get("report_ip_list"); ArrayList<HashMap<String, Object>> reportIpList = (ArrayList<HashMap<String, Object>>) sketchSource.get("report_ip_list");
for (HashMap<String, Object> obj : reportIpList) { for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog(); DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setCommon_sled_ip(commonSledIp);
dosSketchLog.setCommon_data_center(commonDataCenter);
dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType); dosSketchLog.setAttack_type(attackType);

View File

@@ -14,8 +14,6 @@ class TrafficServerIpMetrics {
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) { static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
DosMetricsLog dosMetricsLog = new DosMetricsLog(); DosMetricsLog dosMetricsLog = new DosMetricsLog();
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000)); dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000));
dosMetricsLog.setCommon_sled_ip(midResuleLog.getCommon_sled_ip());
dosMetricsLog.setCommon_data_center(midResuleLog.getCommon_data_center());
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip()); dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type()); dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
@@ -35,7 +33,7 @@ class TrafficServerIpMetrics {
} }
public static void main(String[] args) { public static void main(String[] args) {
// System.out.println(getPartitionNumByIp("146.177.223.43")); System.out.println(getPartitionNumByIp("146.177.223.43"));
System.out.println("146.177.223.43".hashCode()); System.out.println("146.177.223.43".hashCode());
} }

View File

@@ -2,6 +2,10 @@ package com.zdjizhi.main;
import com.zdjizhi.sink.OutputStreamSink; import com.zdjizhi.sink.OutputStreamSink;
/**
* @author wlh
* 程序主类入口
*/
public class DosDetectionApplication { public class DosDetectionApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@@ -8,11 +8,8 @@ import com.zdjizhi.etl.DosDetection;
import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.EtlProcessFunction;
import com.zdjizhi.etl.ParseSketchLog; import com.zdjizhi.etl.ParseSketchLog;
import com.zdjizhi.utils.FlinkEnvironmentUtils; import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
@@ -20,8 +17,6 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*;
/** /**
* @author 94976 * @author 94976
*/ */
@@ -43,11 +38,7 @@ public class OutputStreamSink {
} }
private static SingleOutputStreamOperator<DosEventLog> getOutputSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){ private static SingleOutputStreamOperator<DosEventLog> getOutputSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
return middleStream.keyBy(new SecondKeySelector())
.reduce(new SecondReduceFunc())
.map(new DosDetection())
.setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM);
} }
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){ private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
@@ -58,56 +49,13 @@ public class OutputStreamSink {
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
} }
private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){ private static class FirstKeySelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
HashSet<String> sourceIpSet = new HashSet<>();
Collections.addAll(sourceIpSet, (sourceIp1 + "," + sourceIp2).split(","));
if (sourceIpSet.size() > CommonConfig.SOURCE_IP_LIST_LIMIT){
return StringUtils.join(takeUniqLimit(sourceIpSet,CommonConfig.SOURCE_IP_LIST_LIMIT),",");
}
return StringUtils.join(sourceIpSet,",");
}
private static<T> Collection<T> takeUniqLimit(Collection<T> collection, int limit){
int i =0;
Collection<T> newSet = new HashSet<>();
for (T t:collection){
if (i < limit){
newSet.add(t);
i += 1;
}
}
return newSet;
}
private static class FirstKeySelector implements KeySelector<DosSketchLog, Tuple4<String, String, String, String>>{
@Override @Override
public Tuple4<String, String, String, String> getKey(DosSketchLog dosSketchLog) throws Exception { public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
return Tuple4.of(
dosSketchLog.getCommon_sled_ip(),
dosSketchLog.getCommon_data_center(),
dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip());
}
}
private static class SecondKeySelector implements KeySelector<DosSketchLog, Tuple2<String, String>> {
@Override
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog) throws Exception {
return Tuple2.of( return Tuple2.of(
dosSketchLog.getAttack_type(), dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip()); dosSketchLog.getDestination_ip());
} }
} }
private static class SecondReduceFunc implements ReduceFunction<DosSketchLog> {
@Override
public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception {
value1.setSketch_sessions((value1.getSketch_sessions()+value2.getSketch_sessions())/2);
value1.setSketch_bytes((value1.getSketch_bytes()+value2.getSketch_bytes())/2);
value1.setSketch_packets((value1.getSketch_packets()+value2.getSketch_packets())/2);
value1.setSource_ip(groupUniqSourceIp(value1.getSource_ip(),value2.getSource_ip()));
return value1;
}
}
} }

View File

@@ -9,6 +9,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties; import java.util.Properties;
/**
* @author wlh
*/
public class DosSketchSource { public class DosSketchSource {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.utils;
import java.util.Collection;
import java.util.HashSet;
/**
* @author wlh
* 扩展集合处理工具
*/
public class CollectionUtils {
public static<T> Collection<T> takeUniqueLimit(Collection<T> collection, int limit){
int i =0;
Collection<T> newSet = new HashSet<>();
for (T t:collection){
if (i < limit){
newSet.add(t);
i += 1;
}
}
return newSet;
}
}

View File

@@ -1,5 +1,108 @@
package com.zdjizhi.utils; package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.*;
/**
* @author wlh
*/
public class HbaseUtils { public class HbaseUtils {
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
private static Table table = null;
private static Scan scan = null;
public static Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
static {
readFromHbase();
}
private static void prepareHbaseEnv() throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
public static void main(String[] args) {
Set<String> keySet = baselineMap.keySet();
for (String key:keySet){
Map<String, List<Integer>> stringListMap = baselineMap.get(key);
Set<String> typeSet = stringListMap.keySet();
for (String type:typeSet){
List<Integer> lines = stringListMap.get(type);
if (lines != null){
System.out.println(key+"--"+type+"--"+Arrays.toString(lines.toArray()));
}
}
}
System.out.println(baselineMap.size());
}
private static void readFromHbase(){
try {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_rate");
ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_rate");
ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_rate");
ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_rate");
floodTypeMap.put("TCP SYN Flood",tcp);
floodTypeMap.put("UDP Flood",udp);
floodTypeMap.put("ICMP Flood",icmp);
floodTypeMap.put("DNS Amplification",dns);
baselineMap.put(rowkey,floodTypeMap);
}
logger.info("格式化baseline数据成功读取IP共{}",baselineMap.size());
}catch (Exception e){
logger.error("读取hbase数据失败",e);
}
}
private static ArrayList<Integer> getArraylist(Result result,String family,String qualifier) throws IOException {
if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){
return null;
}
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
}
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = writable.get();
ArrayList<Integer> list = new ArrayList<>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
} }

View File

@@ -15,7 +15,13 @@ public class IpUtils {
.build(); .build();
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("61.128.159.186")); System.out.println(ipLookup.countryLookup("94.23.23.52"));
// String ips = "192.168.50.23,192.168.50.45,192.168.56.9,192.168.56.8,192.168.50.58,192.168.56.7,192.168.56.6,192.168.50.40,192.168.50.19,192.168.50.6,192.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
// for (String ip:ips.split(",")){
// System.out.println(ip+"--"+ipLookup.countryLookup(ip));
// }
} }

View File

@@ -11,28 +11,34 @@ kafka.input.parallelism=1
kafka.input.topic.name=DOS-SKETCH-LOG kafka.input.topic.name=DOS-SKETCH-LOG
#输入kafka地址 #输入kafka地址
kafka.input.bootstrap.servers=192.168.44.12:9092 #kafka.input.bootstrap.servers=192.168.44.12:9092
kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#读取kafka group id #读取kafka group id
kafka.input.group.id=2108041755 kafka.input.group.id=2108161121
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小 #发送kafka metrics并行度大小
kafka.output.metric.parallelism=1 kafka.output.metric.parallelism=1
#发送kafka metrics topic名 #发送kafka metrics topic名
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG #kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
kafka.output.metric.topic.name=test
#发送kafka event并行度大小 #发送kafka event并行度大小
kafka.output.event.parallelism=1 kafka.output.event.parallelism=1
#发送kafka event topic名 #发送kafka event topic名
kafka.output.event.topic.name=DOS-EVENT-LOG #kafka.output.event.topic.name=DOS-EVENT-LOG
kafka.output.event.topic.name=test
#kafka输出地址 #kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9092 kafka.output.bootstrap.servers=192.168.44.12:9092
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址 #zookeeper地址
hbase.zookeeper.quorum=192.168.44.12:2181 #hbase.zookeeper.quorum=192.168.44.12:2181
hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间 #hbase客户端处理时间
hbase.client.operation.timeout=30000 hbase.client.operation.timeout=30000
@@ -44,11 +50,11 @@ hbase.baseline.table.name=ddos_traffic_baselines
#读取baseline限制 #读取baseline限制
hbase.baseline.total.num=1000000 hbase.baseline.total.num=1000000
#设置首次聚合并行度,4个key #设置聚合并行度,2个key
flink.first.agg.parallelism=1 flink.first.agg.parallelism=1
#设置二次聚合并行度2个key #设置结果判定并行度
flink.second.agg.parallelism=1 flink.detection.map.parallelism=1
#watermark延迟 #watermark延迟
flink.watermark.max.orderness=1 flink.watermark.max.orderness=1
@@ -65,7 +71,9 @@ destination.ip.partition.num=10000
data.center.id.num=15 data.center.id.num=15
#IP mmdb库路径 #IP mmdb库路径
ip.mmdb.path=D:\\data\\dat_test\\ #ip.mmdb.path=D:\\data\\dat_test\\
ip.mmdb.path=D:\\data\\
#ip.mmdb.path=/home/bigdata/topology/dat/
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
#基于baseline判定dos攻击的上下限 #基于baseline判定dos攻击的上下限