From 177e7461cc679c8ce58c5859410e5710ac6219de Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Thu, 21 Oct 2021 18:27:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9E=84=E5=BB=BAbaseline?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zdjizhi/common/DosBaselineThreshold.java | 43 ++++++++ .../java/com/zdjizhi/etl/DosDetection.java | 77 +++++--------- .../zdjizhi/etl/ParseBaselineThreshold.java | 100 ++++++++++++++++++ .../java/com/zdjizhi/utils/HbaseUtils.java | 78 +------------- 4 files changed, 171 insertions(+), 127 deletions(-) create mode 100644 src/main/java/com/zdjizhi/common/DosBaselineThreshold.java create mode 100644 src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java diff --git a/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java new file mode 100644 index 0000000..e8bc228 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DosBaselineThreshold.java @@ -0,0 +1,43 @@ +package com.zdjizhi.common; + +import java.io.Serializable; +import java.util.ArrayList; + +public class DosBaselineThreshold implements Serializable { + private ArrayList session_rate; + private Integer session_rate_baseline_type; + private Integer session_rate_default_value; + + public ArrayList getSession_rate() { + return session_rate; + } + + public void setSession_rate(ArrayList session_rate) { + this.session_rate = session_rate; + } + + public Integer getSession_rate_baseline_type() { + return session_rate_baseline_type; + } + + public void setSession_rate_baseline_type(Integer session_rate_baseline_type) { + this.session_rate_baseline_type = session_rate_baseline_type; + } + + public Integer getSession_rate_default_value() { + return session_rate_default_value; + } + + public void setSession_rate_default_value(Integer session_rate_default_value) { + this.session_rate_default_value = session_rate_default_value; + } + + @Override + public String toString() { + return "DosBaselineThreshold{" + + "session_rate=" + session_rate + + ", session_rate_baseline_type=" + session_rate_baseline_type + + ", session_rate_default_value=" + session_rate_default_value + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index eace1c3..63960cd 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -1,9 +1,6 @@ package com.zdjizhi.etl; -import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.DosDetectionThreshold; -import com.zdjizhi.common.DosEventLog; -import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.common.*; import com.zdjizhi.utils.*; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; @@ -11,7 +8,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; import org.slf4j.Logger; @@ -30,11 +26,15 @@ import java.util.concurrent.TimeUnit; public class DosDetection extends RichMapFunction { private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); - private static Map, Integer>>> baselineMap = new HashMap<>(); - private final static int BASELINE_SIZE = 144; + private static Map> baselineMap = new HashMap<>(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private HashMap> thresholdRangeMap; + private final static int BASELINE_SIZE = 144; + private final static int STATIC_CONDITION_TYPE = 1; + private final static int BASELINE_CONDITION_TYPE = 2; + private final static int SENSITIVITY_CONDITION_TYPE = 3; + @Override public void open(Configuration parameters) { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, @@ -43,7 +43,7 @@ public class DosDetection extends RichMapFunction { executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); - executorService.scheduleAtFixedRate(() -> baselineMap = HbaseUtils.readFromHbase(), 0, + executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); } catch (Exception e) { logger.error("定时器任务执行失败", e); @@ -92,8 +92,8 @@ public class DosDetection extends RichMapFunction { String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) { - Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); - Integer base = getBaseValue(floodTypeTup, value); + DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType); + Integer base = getBaseValue(dosBaselineThreshold, value); result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions"); } return result; @@ -124,7 +124,7 @@ public class DosDetection extends RichMapFunction { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); if (severity != Severity.NORMAL) { - if (type == 2 && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) { + if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else { result = getResult(value, base, severity, percent, type, tag); @@ -156,12 +156,12 @@ public class DosDetection extends RichMapFunction { return dosEventLog; } - private Integer getBaseValue(Tuple2, Integer> floodTypeTup, DosSketchLog value) { + private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) { Integer base = 0; try { - if (floodTypeTup != null) { - ArrayList baselines = floodTypeTup.f0; - Integer defaultVaule = floodTypeTup.f1; + if (dosBaselineThreshold != null) { + ArrayList baselines = dosBaselineThreshold.getSession_rate(); + Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value(); if (baselines != null && baselines.size() == BASELINE_SIZE) { int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); base = baselines.get(timeIndex); @@ -179,25 +179,25 @@ public class DosDetection extends RichMapFunction { private String getConditions(String percent, long base, long sessions, int type, String tag) { switch (type) { - case 1: + case STATIC_CONDITION_TYPE: return new StrBuilder() .append(tag).append(" > ") .append(base).append(" ") .append(tag).append("/s") .toString(); - case 2: + case BASELINE_CONDITION_TYPE: return new StrBuilder() .append(tag).append(" > ") .append(percent).append(" of baseline") .toString(); - case 3: + case SENSITIVITY_CONDITION_TYPE: return new StrBuilder() .append(sessions).append(" ") .append(tag).append("/s Unusually high ") .append(StringUtils.capitalize(tag)) .toString(); default: - throw new IllegalArgumentException("Illegal Argument " + type + ", known types = [1,2,3]"); + throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]"); } } @@ -267,49 +267,24 @@ public class DosDetection extends RichMapFunction { /** * 判断严重程度枚举类型 */ - CRITICAL("Critical", 5), - SEVERE("Severe", 4), - MAJOR("Major", 3), - WARNING("Warning", 2), - MINOR("Minor", 1), - NORMAL("Normal", 0); + CRITICAL("Critical"), + SEVERE("Severe"), + MAJOR("Major"), + WARNING("Warning"), + MINOR("Minor"), + NORMAL("Normal"); private final String severity; - private final int score; @Override public String toString() { return this.severity; } - Severity(String severity, int score) { + Severity(String severity) { this.severity = severity; - this.score = score; } } - @Deprecated - private DosEventLog mergeFinalResult(Tuple2 eventLogByBaseline, Tuple2 eventLogByStaticThreshold) { - if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) { - logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}", eventLogByBaseline, eventLogByStaticThreshold); - return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1); - } else { - logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}", eventLogByStaticThreshold, eventLogByBaseline); - return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1); - } - } - - @Deprecated - private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) { - if (log1 != null && log2 != null) { - String conditions1 = log1.getConditions(); - String conditions2 = log2.getConditions(); - log1.setConditions(conditions1 + " and " + conditions2); - } else if (log1 == null && log2 != null) { - log1 = log2; - } - return log1; - } - } diff --git a/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java b/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java new file mode 100644 index 0000000..0db4bdf --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java @@ -0,0 +1,100 @@ +package com.zdjizhi.etl; + +import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.DosBaselineThreshold; +import com.zdjizhi.utils.HbaseUtils; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ParseBaselineThreshold { + + private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class); + private static ArrayList floodTypeList = new ArrayList<>(); + + private static Table table = null; + private static Scan scan = null; + + static { + floodTypeList.add("TCP SYN Flood"); + floodTypeList.add("UDP Flood"); + floodTypeList.add("ICMP Flood"); + floodTypeList.add("DNS Flood"); + } + + private static void prepareHbaseEnv() throws IOException { + org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); + + config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); + config.set("hbase.client.retries.number", "3"); + config.set("hbase.bulkload.retries.number", "3"); + config.set("zookeeper.recovery.retry", "3"); + config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); + config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(config); + table = conn.getTable(tableName); + scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); + logger.info("连接hbase成功,正在读取baseline数据"); + } + + + static Map> readFromHbase() { + Map> baselineMap = new HashMap<>(); + try { + prepareHbaseEnv(); + logger.info("开始读取baseline数据"); + ResultScanner rs = table.getScanner(scan); + for (Result result : rs) { + Map floodTypeMap = new HashMap<>(); + String rowkey = Bytes.toString(result.getRow()); + for (String type:floodTypeList){ + DosBaselineThreshold baselineThreshold = new DosBaselineThreshold(); + ArrayList sessionRate = HbaseUtils.getArraylist(result, type, "session_rate"); + if (sessionRate != null && !sessionRate.isEmpty()){ + Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value"); + Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type"); + baselineThreshold.setSession_rate(sessionRate); + baselineThreshold.setSession_rate_default_value(defaultValue); + baselineThreshold.setSession_rate_baseline_type(rateBaselineType); + floodTypeMap.put(type,baselineThreshold); + } + } + baselineMap.put(rowkey, floodTypeMap); + } + logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size()); + } catch (Exception e) { + logger.error("读取hbase数据失败", e); + } + return baselineMap; + } + + public static void main(String[] args) { + Map> baselineMap = readFromHbase(); + Set keySet = baselineMap.keySet(); + for (String key : keySet) { + Map stringTuple2Map = baselineMap.get(key); + Set strings = stringTuple2Map.keySet(); + for (String s:strings){ + DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s); + System.out.println(key+"---"+s+"---"+dosBaselineThreshold); + } + } + System.out.println(baselineMap.size()); + } + + + + +} diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index 860ba55..428298b 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -1,17 +1,10 @@ package com.zdjizhi.utils; -import com.zdjizhi.common.CommonConfig; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -22,75 +15,8 @@ import java.util.*; * @author wlh */ public class HbaseUtils { - private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); - private static Table table = null; - private static Scan scan = null; - private static ArrayList floodTypeList = new ArrayList<>(); - static { - floodTypeList.add("TCP SYN Flood"); - floodTypeList.add("UDP Flood"); - floodTypeList.add("ICMP Flood"); - floodTypeList.add("DNS Flood"); - } - - private static void prepareHbaseEnv() throws IOException { - org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - - config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); - config.set("hbase.client.retries.number", "3"); - config.set("hbase.bulkload.retries.number", "3"); - config.set("zookeeper.recovery.retry", "3"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); - - TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME); - Connection conn = ConnectionFactory.createConnection(config); - table = conn.getTable(tableName); - scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM); - logger.info("连接hbase成功,正在读取baseline数据"); - } - - public static void main(String[] args) { - Map, Integer>>> baselineMap = readFromHbase(); - Set keySet = baselineMap.keySet(); - for (String key : keySet) { - Map, Integer>> stringTuple2Map = baselineMap.get(key); - Set strings = stringTuple2Map.keySet(); - for (String s:strings){ - Tuple2, Integer> arrayListIntegerTuple2 = stringTuple2Map.get(s); - System.out.println(key+"---"+s+"---"+arrayListIntegerTuple2.f0+"---"+arrayListIntegerTuple2.f1); - } - } - System.out.println(baselineMap.size()); - } - - public static Map, Integer>>> readFromHbase() { - Map, Integer>>> baselineMap = new HashMap<>(); - try { - prepareHbaseEnv(); - logger.info("开始读取baseline数据"); - ResultScanner rs = table.getScanner(scan); - for (Result result : rs) { - Map, Integer>> floodTypeMap = new HashMap<>(); - String rowkey = Bytes.toString(result.getRow()); - for (String type:floodTypeList){ - ArrayList sessionRate = getArraylist(result, type, "session_rate"); - if (sessionRate != null && !sessionRate.isEmpty()){ - Integer defaultValue = getDefaultValue(result, type, "session_rate_default_value"); - floodTypeMap.put(type,Tuple2.of(sessionRate, defaultValue)); - } - } - baselineMap.put(rowkey, floodTypeMap); - } - logger.info("格式化baseline数据成功,读取IP共:{}", baselineMap.size()); - } catch (Exception e) { - logger.error("读取hbase数据失败", e); - } - return baselineMap; - } - - private static Integer getDefaultValue(Result result, String family, String qualifier) { + public static Integer getIntegerValue(Result result, String family, String qualifier) { byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); if (value != null){ return Bytes.toInt(value); @@ -98,7 +24,7 @@ public class HbaseUtils { return 1; } - private static ArrayList getArraylist(Result result, String family, String qualifier) throws IOException { + public static ArrayList getArraylist(Result result, String family, String qualifier) throws IOException { if (containsColumn(result, family, qualifier)) { ArrayWritable w = new ArrayWritable(IntWritable.class); w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));