修改构建threshold RangeMap逻辑,基于attack type为key,避免IP冲突问题。
This commit is contained in:
@@ -33,7 +33,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
||||
private final static int BASELINE_SIZE = 144;
|
||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||
private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
|
||||
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) {
|
||||
@@ -58,14 +58,14 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
|
||||
Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
|
||||
DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
|
||||
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
|
||||
if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && baselineMap.containsKey(destinationIp)) {
|
||||
if (threshold == null && baselineMap.containsKey(destinationIp)) {
|
||||
finalResult = getDosEventLogByBaseline(value);
|
||||
} else if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && !baselineMap.containsKey(destinationIp)) {
|
||||
} else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
|
||||
finalResult = getDosEventLogBySensitivityThreshold(value);
|
||||
} else if (thresholdMap != null && thresholdMap.containsKey(attackType)) {
|
||||
finalResult = getDosEventLogByStaticThreshold(value, thresholdMap);
|
||||
} else if (threshold != null) {
|
||||
finalResult = getDosEventLogByStaticThreshold(value, threshold);
|
||||
} else {
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
|
||||
}
|
||||
@@ -99,24 +99,18 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
return result;
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
|
||||
DosEventLog result = null;
|
||||
String attackType = value.getAttack_type();
|
||||
long base, diff;
|
||||
if (thresholdMap.containsKey(attackType)) {
|
||||
DosDetectionThreshold threshold = thresholdMap.get(attackType);
|
||||
base = threshold.getSessionsPerSec();
|
||||
diff = value.getSketch_sessions() - base;
|
||||
result = getDosEventLog(value, base, diff, 1, "sessions");
|
||||
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
|
||||
long base = threshold.getSessionsPerSec();
|
||||
long diff = value.getSketch_sessions() - base;
|
||||
DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions");
|
||||
if (result == null) {
|
||||
base = threshold.getPacketsPerSec();
|
||||
diff = value.getSketch_packets() - base;
|
||||
result = getDosEventLog(value, base, diff, 1, "packets");
|
||||
if (result == null) {
|
||||
base = threshold.getPacketsPerSec();
|
||||
diff = value.getSketch_packets() - base;
|
||||
result = getDosEventLog(value, base, diff, 1, "packets");
|
||||
if (result == null) {
|
||||
base = threshold.getBitsPerSec();
|
||||
diff = value.getSketch_bytes() - base;
|
||||
result = getDosEventLog(value, base, diff, 1, "bits");
|
||||
}
|
||||
base = threshold.getBitsPerSec();
|
||||
diff = value.getSketch_bytes() - base;
|
||||
result = getDosEventLog(value, base, diff, 1, "bits");
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -134,7 +128,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||
} else {
|
||||
result = getResult(value, base, severity, percent, type, tag);
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, result);
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
|
||||
}
|
||||
} else {
|
||||
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
|
||||
|
||||
@@ -21,6 +21,9 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
|
||||
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
|
||||
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
|
||||
|
||||
@Override
|
||||
public void process(Tuple2<String, String> keys,
|
||||
Context context, Iterable<DosSketchLog> elements,
|
||||
@@ -70,7 +73,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
try {
|
||||
for (DosSketchLog newSketchLog : elements){
|
||||
String sourceIp = newSketchLog.getSource_ip();
|
||||
if ("0.0.0.0".equals(sourceIp) || "::".equals(sourceIp)){
|
||||
if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
|
||||
sessions += newSketchLog.getSketch_sessions();
|
||||
packets += newSketchLog.getSketch_packets();
|
||||
bytes += newSketchLog.getSketch_bytes();
|
||||
|
||||
@@ -141,19 +141,19 @@ public class ParseStaticThreshold {
|
||||
*
|
||||
* @return threshold RangeMap
|
||||
*/
|
||||
static TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> createStaticThreshold() {
|
||||
TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap = TreeRangeMap.create();
|
||||
static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
|
||||
try {
|
||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
|
||||
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
|
||||
String attackType = threshold.getAttackType();
|
||||
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
|
||||
ArrayList<String> serverIpList = threshold.getServerIpList();
|
||||
for (String sip : serverIpList) {
|
||||
IPAddressString ipAddressString = new IPAddressString(sip);
|
||||
if (ipAddressString.isIPAddress()) {
|
||||
IPAddress address = ipAddressString.getAddress();
|
||||
Map<String, DosDetectionThreshold> floodTypeThresholdMap = new HashMap<>();
|
||||
floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
|
||||
if (address.isPrefixed()) {
|
||||
IPAddress lower = address.getLower();
|
||||
IPAddress upper = address.getUpper();
|
||||
@@ -161,40 +161,27 @@ public class ParseStaticThreshold {
|
||||
lower = address.adjustPrefixLength(address.getBitCount());
|
||||
upper = address.toMaxHost().withoutPrefixLength();
|
||||
}
|
||||
Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> lowerEntry = thresholdRangeMap.getEntry(lower);
|
||||
Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> upperEntry = thresholdRangeMap.getEntry(upper);
|
||||
Map.Entry<Range<IPAddress>, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower);
|
||||
Map.Entry<Range<IPAddress>, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper);
|
||||
if (lowerEntry != null && upperEntry == null) {
|
||||
Range<IPAddress> lowerEntryKey = lowerEntry.getKey();
|
||||
Map<String, DosDetectionThreshold> lowerEntryValue = lowerEntry.getValue();
|
||||
lowerEntryValue.put(threshold.getAttackType(), threshold);
|
||||
thresholdRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
|
||||
thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
|
||||
DosDetectionThreshold lowerEntryValue = lowerEntry.getValue();
|
||||
treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
|
||||
treeRangeMap.put(Range.closed(lower, upper), threshold);
|
||||
} else if (lowerEntry == null && upperEntry != null) {
|
||||
Range<IPAddress> upperEntryKey = upperEntry.getKey();
|
||||
Map<String, DosDetectionThreshold> upperEntryValue = upperEntry.getValue();
|
||||
upperEntryValue.put(threshold.getAttackType(), threshold);
|
||||
thresholdRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
|
||||
thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
|
||||
DosDetectionThreshold upperEntryValue = upperEntry.getValue();
|
||||
treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
|
||||
treeRangeMap.put(Range.closed(lower, upper), threshold);
|
||||
} else {
|
||||
thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
|
||||
treeRangeMap.put(Range.closed(lower, upper), threshold);
|
||||
}
|
||||
} else {
|
||||
Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> entry = thresholdRangeMap.getEntry(address);
|
||||
if (entry != null) {
|
||||
Range<IPAddress> entryKey = entry.getKey();
|
||||
Map<String, DosDetectionThreshold> entryValue = entry.getValue();
|
||||
if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()) {
|
||||
entryValue.put(threshold.getAttackType(), threshold);
|
||||
thresholdRangeMap.put(Range.closed(address, address), entryValue);
|
||||
} else {
|
||||
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
|
||||
}
|
||||
} else {
|
||||
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
|
||||
}
|
||||
treeRangeMap.put(Range.closed(address, address), threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
thresholdRangeMap.put(attackType,treeRangeMap);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -210,16 +197,17 @@ public class ParseStaticThreshold {
|
||||
|
||||
|
||||
System.out.println("------------------------");
|
||||
TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
|
||||
|
||||
System.out.println("------------------------");
|
||||
Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
|
||||
for (Range<IPAddress> range : rangeMapMap.keySet()) {
|
||||
Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
|
||||
for (String type : thresholdMap.keySet()) {
|
||||
DosDetectionThreshold threshold = thresholdMap.get(type);
|
||||
System.out.println(range + "---" + type + "---" + threshold);
|
||||
|
||||
for (String type : staticThreshold.keySet()) {
|
||||
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
|
||||
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
|
||||
DosDetectionThreshold threshold = asMapOfRanges.get(range);
|
||||
System.out.println(type + "---" + range + "---" + threshold);
|
||||
}
|
||||
System.out.println("------------------------");
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ flink.detection.map.parallelism=1
|
||||
flink.watermark.max.orderness=10
|
||||
|
||||
#计算窗口大小,默认600s
|
||||
flink.window.max.time=60
|
||||
flink.window.max.time=600
|
||||
|
||||
#dos event结果中distinct source IP限制
|
||||
source.ip.list.limit=10000
|
||||
@@ -89,8 +89,8 @@ baseline.sessions.severe.threshold=3
|
||||
baseline.sessions.critical.threshold=8
|
||||
|
||||
#bifang服务访问地址
|
||||
#bifang.server.uri=http://192.168.44.72:80
|
||||
bifang.server.uri=http://192.168.44.3:80
|
||||
bifang.server.uri=http://192.168.44.72:80
|
||||
#bifang.server.uri=http://192.168.44.3:80
|
||||
|
||||
#访问bifang只读权限token,bifang内置,无需修改
|
||||
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
|
||||
|
||||
Reference in New Issue
Block a user