From 488b7c6644bf796f5f9fe7d5824c72cbe53a02de Mon Sep 17 00:00:00 2001 From: wanglihui Date: Tue, 6 Dec 2022 17:13:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/etl/DosDetection.java | 78 +++++++++---------- .../com/zdjizhi/etl/EtlProcessFunction.java | 2 +- src/main/resources/common.properties | 10 +-- 3 files changed, 43 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index d1992d3..8190055 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -6,8 +6,7 @@ import com.zdjizhi.common.*; import com.zdjizhi.utils.*; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.text.StrBuilder; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; @@ -88,36 +87,31 @@ public class DosDetection extends ProcessFunction { } } catch (Exception e) { + e.printStackTrace(); logger.error("判定失败\n {} \n{}", value, e); } for (DosEventLog dosEventLog:finalResults){ - out.collect(dosEventLog); + if (dosEventLog != null){ + out.collect(dosEventLog); + } } } private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { - DosEventLog result = null; long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) { - long diff = sketchSessions - NacosUtils.getIntProperty("static.sensitivity.threshold"); - result = getDosEventLog(value, NacosUtils.getIntProperty("static.sensitivity.threshold"), diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); - result.setSeverity(Severity.MAJOR.severity); - } - return result; + Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); + long diff = sketchSessions - staticSensitivityThreshold; + return getDosEventLog(value, staticSensitivityThreshold, diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) { - DosEventLog result = null; String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) { - DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); - Integer base = getBaseValue(dosBaselineThreshold, value); - long diff = sketchSessions - base; - result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG); - } - return result; + DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); + Integer base = getBaseValue(dosBaselineThreshold, value); + long diff = sketchSessions - base; + return getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG); } private ArrayList getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException { @@ -135,14 +129,16 @@ public class DosDetection extends ProcessFunction { } } ArrayList dosEventLogs = new ArrayList<>(); - dosEventLogs.add(result); - Integer[] superiorIds = threshold.getSuperiorIds(); - if (superiorIds != null && superiorIds.length > 0){ - for (Integer integer:superiorIds){ - DosEventLog clone = (DosEventLog) result.clone(); - clone.setVsys_id(integer); - clone.setLog_id(SnowflakeId.generateId()); - dosEventLogs.add(clone); + if (result != null){ + dosEventLogs.add(result); + Integer[] superiorIds = threshold.getSuperiorIds(); + if (superiorIds != null && superiorIds.length > 0){ + for (Integer integer:superiorIds){ + DosEventLog clone = (DosEventLog) result.clone(); + clone.setVsys_id(integer); + clone.setLog_id(SnowflakeId.generateId()); + dosEventLogs.add(clone); + } } } return dosEventLogs; @@ -155,11 +151,17 @@ public class DosDetection extends ProcessFunction { if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); + Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); if (severity != Severity.NORMAL) { if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); - } else { + }else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){ + logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value); + }else { result = getResult(value, base, severity, percent+1, type, tag); + if (type == SENSITIVITY_CONDITION_TYPE){ + result.setSeverity(Severity.MAJOR.severity); + } logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result); } } else { @@ -217,22 +219,16 @@ public class DosDetection extends ProcessFunction { private String getConditions(String percent, long base, long sessions, int type, String tag) { switch (type) { case STATIC_CONDITION_TYPE: - return new StrBuilder() - .append("Rate > ") - .append(base).append(" ") - .append(tag).append("/s") - .toString(); + return "Rate > " + + base + " " + + tag + "/s"; case BASELINE_CONDITION_TYPE: - return new StrBuilder() - .append(tag).append(" > ") - .append(percent).append(" of baseline") - .toString(); + return tag + " > " + + percent + " of baseline"; case SENSITIVITY_CONDITION_TYPE: - return new StrBuilder() - .append(sessions).append(" ") - .append(tag).append("/s Unusually high ") - .append(StringUtils.capitalize(tag)) - .toString(); + return String.valueOf(sessions) + " " + + tag + "/s Unusually high " + + StringUtils.capitalize(tag); default: throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]"); } diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index b4ac091..bfd4ad8 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -4,7 +4,7 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 0cfedc0..6c4cbc5 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id -kafka.input.group.id=dos-detection-job-221010-1 +kafka.input.group.id=dos-detection-job-221125-1 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 @@ -60,10 +60,10 @@ flink.first.agg.parallelism=1 flink.detection.map.parallelism=1 #watermark延迟 -flink.watermark.max.orderness=10 +flink.watermark.max.orderness=300 #计算窗口大小,默认600s -flink.window.max.time=10 +flink.window.max.time=600 #dos event结果中distinct source IP限制 source.ip.list.limit=10000 @@ -79,8 +79,8 @@ ip.mmdb.path=D:\\data\\dat\\ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #bifang服务访问地址 -#bifang.server.uri=http://192.168.44.72:80 -bifang.server.uri=http://192.168.44.3:80 +bifang.server.uri=http://192.168.44.72:80 +#bifang.server.uri=http://192.168.44.3:80 #加密密码路径信息 bifang.server.encryptpwd.path=/v1/user/encryptpwd