diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 2fa3bfa..eace1c3 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -33,7 +33,7 @@ public class DosDetection extends RichMapFunction { private static Map, Integer>>> baselineMap = new HashMap<>(); private final static int BASELINE_SIZE = 144; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); - private TreeRangeMap> thresholdRangeMap; + private HashMap> thresholdRangeMap; @Override public void open(Configuration parameters) { @@ -58,14 +58,14 @@ public class DosDetection extends RichMapFunction { String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); - Map thresholdMap = thresholdRangeMap.get(destinationIpAddress); + DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && baselineMap.containsKey(destinationIp)) { + if (threshold == null && baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogByBaseline(value); - } else if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && !baselineMap.containsKey(destinationIp)) { + } else if (threshold == null && !baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogBySensitivityThreshold(value); - } else if (thresholdMap != null && thresholdMap.containsKey(attackType)) { - finalResult = getDosEventLogByStaticThreshold(value, thresholdMap); + } else if (threshold != null) { + finalResult = getDosEventLogByStaticThreshold(value, threshold); } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } @@ -99,24 +99,18 @@ public class DosDetection extends RichMapFunction { return result; } - private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) { - DosEventLog result = null; - String attackType = value.getAttack_type(); - long base, diff; - if (thresholdMap.containsKey(attackType)) { - DosDetectionThreshold threshold = thresholdMap.get(attackType); - base = threshold.getSessionsPerSec(); - diff = value.getSketch_sessions() - base; - result = getDosEventLog(value, base, diff, 1, "sessions"); + private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) { + long base = threshold.getSessionsPerSec(); + long diff = value.getSketch_sessions() - base; + DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions"); + if (result == null) { + base = threshold.getPacketsPerSec(); + diff = value.getSketch_packets() - base; + result = getDosEventLog(value, base, diff, 1, "packets"); if (result == null) { - base = threshold.getPacketsPerSec(); - diff = value.getSketch_packets() - base; - result = getDosEventLog(value, base, diff, 1, "packets"); - if (result == null) { - base = threshold.getBitsPerSec(); - diff = value.getSketch_bytes() - base; - result = getDosEventLog(value, base, diff, 1, "bits"); - } + base = threshold.getBitsPerSec(); + diff = value.getSketch_bytes() - base; + result = getDosEventLog(value, base, diff, 1, "bits"); } } return result; @@ -134,7 +128,7 @@ public class DosDetection extends RichMapFunction { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else { result = getResult(value, base, severity, percent, type, tag); - logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, result); + logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result); } } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index ecbd8b6..bed7fdb 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -21,6 +21,9 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag; public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); + private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; + private static final String EMPTY_SOURCE_IP_IPV6 = "::"; + @Override public void process(Tuple2 keys, Context context, Iterable elements, @@ -70,7 +73,7 @@ public class EtlProcessFunction extends ProcessWindowFunction> createStaticThreshold() { - TreeRangeMap> thresholdRangeMap = TreeRangeMap.create(); + static HashMap> createStaticThreshold() { + HashMap> thresholdRangeMap = new HashMap<>(4); try { ArrayList dosDetectionThreshold = getDosDetectionThreshold(); if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { for (DosDetectionThreshold threshold : dosDetectionThreshold) { + String attackType = threshold.getAttackType(); + TreeRangeMap treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()); ArrayList serverIpList = threshold.getServerIpList(); for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); if (ipAddressString.isIPAddress()) { IPAddress address = ipAddressString.getAddress(); - Map floodTypeThresholdMap = new HashMap<>(); - floodTypeThresholdMap.put(threshold.getAttackType(), threshold); if (address.isPrefixed()) { IPAddress lower = address.getLower(); IPAddress upper = address.getUpper(); @@ -161,40 +161,27 @@ public class ParseStaticThreshold { lower = address.adjustPrefixLength(address.getBitCount()); upper = address.toMaxHost().withoutPrefixLength(); } - Map.Entry, Map> lowerEntry = thresholdRangeMap.getEntry(lower); - Map.Entry, Map> upperEntry = thresholdRangeMap.getEntry(upper); + Map.Entry, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower); + Map.Entry, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper); if (lowerEntry != null && upperEntry == null) { Range lowerEntryKey = lowerEntry.getKey(); - Map lowerEntryValue = lowerEntry.getValue(); - lowerEntryValue.put(threshold.getAttackType(), threshold); - thresholdRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue); - thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + DosDetectionThreshold lowerEntryValue = lowerEntry.getValue(); + treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue); + treeRangeMap.put(Range.closed(lower, upper), threshold); } else if (lowerEntry == null && upperEntry != null) { Range upperEntryKey = upperEntry.getKey(); - Map upperEntryValue = upperEntry.getValue(); - upperEntryValue.put(threshold.getAttackType(), threshold); - thresholdRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue); - thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + DosDetectionThreshold upperEntryValue = upperEntry.getValue(); + treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue); + treeRangeMap.put(Range.closed(lower, upper), threshold); } else { - thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + treeRangeMap.put(Range.closed(lower, upper), threshold); } } else { - Map.Entry, Map> entry = thresholdRangeMap.getEntry(address); - if (entry != null) { - Range entryKey = entry.getKey(); - Map entryValue = entry.getValue(); - if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()) { - entryValue.put(threshold.getAttackType(), threshold); - thresholdRangeMap.put(Range.closed(address, address), entryValue); - } else { - thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); - } - } else { - thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); - } + treeRangeMap.put(Range.closed(address, address), threshold); } } } + thresholdRangeMap.put(attackType,treeRangeMap); } } } catch (Exception e) { @@ -210,16 +197,17 @@ public class ParseStaticThreshold { System.out.println("------------------------"); - TreeRangeMap> staticThreshold = createStaticThreshold(); + HashMap> staticThreshold = createStaticThreshold(); System.out.println("------------------------"); - Map, Map> rangeMapMap = staticThreshold.asMapOfRanges(); - for (Range range : rangeMapMap.keySet()) { - Map thresholdMap = rangeMapMap.get(range); - for (String type : thresholdMap.keySet()) { - DosDetectionThreshold threshold = thresholdMap.get(type); - System.out.println(range + "---" + type + "---" + threshold); + + for (String type : staticThreshold.keySet()) { + Map, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges(); + for (Range range : asMapOfRanges.keySet()) { + DosDetectionThreshold threshold = asMapOfRanges.get(range); + System.out.println(type + "---" + range + "---" + threshold); } + System.out.println("------------------------"); } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 97286f7..15cc646 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -60,7 +60,7 @@ flink.detection.map.parallelism=1 flink.watermark.max.orderness=10 #计算窗口大小,默认600s -flink.window.max.time=60 +flink.window.max.time=600 #dos event结果中distinct source IP限制 source.ip.list.limit=10000 @@ -89,8 +89,8 @@ baseline.sessions.severe.threshold=3 baseline.sessions.critical.threshold=8 #bifang服务访问地址 -#bifang.server.uri=http://192.168.44.72:80 -bifang.server.uri=http://192.168.44.3:80 +bifang.server.uri=http://192.168.44.72:80 +#bifang.server.uri=http://192.168.44.3:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867