From c692112445428b9e0b26a9cfc6da7352c12973d4 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Tue, 19 Oct 2021 18:39:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8Epackets?= =?UTF-8?q?=E4=B8=8Ebits=E4=BD=9C=E4=B8=BAstatic=E6=9D=A1=E4=BB=B6?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E4=BE=9D=E6=8D=AE=E3=80=82=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8Dstatic=E9=85=8D=E7=BD=AEIP=E5=86=B2=E7=AA=81=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/etl/DosDetection.java | 134 +++++++++++------- .../com/zdjizhi/etl/ParseStaticThreshold.java | 32 ++--- src/main/resources/common.properties | 5 +- 3 files changed, 104 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 688fccf..2fa3bfa 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -4,13 +4,11 @@ import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.utils.DateUtils; -import com.zdjizhi.utils.HbaseUtils; -import com.zdjizhi.utils.IpUtils; -import com.zdjizhi.utils.SnowflakeId; +import com.zdjizhi.utils.*; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -64,11 +62,11 @@ public class DosDetection extends RichMapFunction { logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogByBaseline(value); - }else if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && !baselineMap.containsKey(destinationIp)){ + } else if ((thresholdMap == null || !thresholdMap.containsKey(attackType)) && !baselineMap.containsKey(destinationIp)) { finalResult = getDosEventLogBySensitivityThreshold(value); - }else if (thresholdMap != null && thresholdMap.containsKey(attackType)){ + } else if (thresholdMap != null && thresholdMap.containsKey(attackType)) { finalResult = getDosEventLogByStaticThreshold(value, thresholdMap); - }else { + } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } @@ -78,11 +76,11 @@ public class DosDetection extends RichMapFunction { return finalResult; } - private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value){ + private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { DosEventLog result = null; long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){ - result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, "sensitivity"); + if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) { + result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, 3, "sessions"); result.setSeverity(Severity.MAJOR.severity); } return result; @@ -93,10 +91,10 @@ public class DosDetection extends RichMapFunction { String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); - if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){ + if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) { Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); Integer base = getBaseValue(floodTypeTup, value); - result = getDosEventLog(value, base, sketchSessions - base, "baseline"); + result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions"); } return result; } @@ -104,16 +102,27 @@ public class DosDetection extends RichMapFunction { private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) { DosEventLog result = null; String attackType = value.getAttack_type(); + long base, diff; if (thresholdMap.containsKey(attackType)) { DosDetectionThreshold threshold = thresholdMap.get(attackType); - long base = threshold.getSessionsPerSec(); - long diff = value.getSketch_sessions() - base; - result = getDosEventLog(value, base, diff, "static"); + base = threshold.getSessionsPerSec(); + diff = value.getSketch_sessions() - base; + result = getDosEventLog(value, base, diff, 1, "sessions"); + if (result == null) { + base = threshold.getPacketsPerSec(); + diff = value.getSketch_packets() - base; + result = getDosEventLog(value, base, diff, 1, "packets"); + if (result == null) { + base = threshold.getBitsPerSec(); + diff = value.getSketch_bytes() - base; + result = getDosEventLog(value, base, diff, 1, "bits"); + } + } } return result; } - private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, String tag) { + private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) { DosEventLog result = null; String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); @@ -121,11 +130,11 @@ public class DosDetection extends RichMapFunction { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); if (severity != Severity.NORMAL) { - if ("baseline".equals(tag) && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD){ - logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}",destinationIp,attackType,base,percent,value); - }else { - result = getResult(value,base, severity, percent, tag); - logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp,attackType,base,percent,tag,result); + if (type == 2 && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) { + logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); + } else { + result = getResult(value, base, severity, percent, type, tag); + logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, result); } } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); @@ -134,14 +143,14 @@ public class DosDetection extends RichMapFunction { return result; } - private DosEventLog getResult(DosSketchLog value,long base, Severity severity, double percent, String tag) { + private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); dosEventLog.setAttack_type(value.getAttack_type()); dosEventLog.setSeverity(severity.severity); - dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag)); + dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); @@ -174,47 +183,74 @@ public class DosDetection extends RichMapFunction { return base; } - private String getConditions(String percent,long base, long sessions, String tag) { - switch (tag) { - case "baseline": - return "sessions > " + percent + " of baseline"; - case "static": - return "sessions > " + base + " sessions/s"; - case "sensitivity": - return sessions+" sessions/s Unusually high Sessions"; + private String getConditions(String percent, long base, long sessions, int type, String tag) { + switch (type) { + case 1: + return new StrBuilder() + .append(tag).append(" > ") + .append(base).append(" ") + .append(tag).append("/s") + .toString(); + case 2: + return new StrBuilder() + .append(tag).append(" > ") + .append(percent).append(" of baseline") + .toString(); + case 3: + return new StrBuilder() + .append(sessions).append(" ") + .append(tag).append("/s Unusually high ") + .append(StringUtils.capitalize(tag)) + .toString(); default: - return null; + throw new IllegalArgumentException("Illegal Argument " + type + ", known types = [1,2,3]"); } } private String getSourceCountryList(String sourceIpList) { - String[] ipArr = sourceIpList.split(","); - HashSet countrySet = new HashSet<>(); - for (String ip : ipArr) { - countrySet.add(IpUtils.ipLookup.countryLookup(ip)); + if (StringUtil.isNotBlank(sourceIpList)) { + String countryList; + try { + String[] ipArr = sourceIpList.split(","); + HashSet countrySet = new HashSet<>(); + for (String ip : ipArr) { + countrySet.add(IpUtils.ipLookup.countryLookup(ip)); + } + countryList = StringUtils.join(countrySet, ","); + return countryList; + } catch (Exception e) { + logger.error("{} source IP lists 获取国家失败", sourceIpList, e); + return StringUtil.EMPTY; + } + } else { + throw new IllegalArgumentException("Illegal Argument sourceIpList = null"); } - return StringUtils.join(countrySet, ","); } private int getCurrentTimeIndex(long sketchStartTime) { - long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime()/1000; - long indexLong = (sketchStartTime - currentDayTime) / 600; - return Integer.parseInt(Long.toString(indexLong)); + int index = 0; + try { + long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000; + long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE); + index = Integer.parseInt(Long.toString(indexLong)); + } catch (Exception e) { + logger.error("获取time index失败", e); + } + return index; } public static void main(String[] args) { Date date = new Date(1631548860 * 1000L); System.out.println(date); Date p1D = DateUtils.getTimeFloor(date, "P1D"); - System.out.println(p1D+" "+p1D.getTime()/1000); - System.out.println(new DosDetection().getCurrentTimeIndex(1631548860)); - System.out.println(10+10*0.2); - Map thresholdMap = null; - System.out.println(thresholdMap.containsKey("a")); + System.out.println(p1D + " " + p1D.getTime() / 1000); + System.out.println(new DosDetection().getCurrentTimeIndex(1634659080)); + System.out.println(new DosDetection().getConditions(PERCENT_INSTANCE.format(1.64862), 100, 100, 3, "packets")); + System.out.println(10 + 10 * 0.2); } private Double getDiffPercent(long diff, long base) { - return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); + return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); } private Severity judgeSeverity(double diffPercent) { @@ -262,10 +298,10 @@ public class DosDetection extends RichMapFunction { @Deprecated private DosEventLog mergeFinalResult(Tuple2 eventLogByBaseline, Tuple2 eventLogByStaticThreshold) { if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) { - logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); + logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}", eventLogByBaseline, eventLogByStaticThreshold); return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1); } else { - logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); + logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}", eventLogByStaticThreshold, eventLogByBaseline); return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1); } } @@ -276,7 +312,7 @@ public class DosDetection extends RichMapFunction { String conditions1 = log1.getConditions(); String conditions2 = log2.getConditions(); log1.setConditions(conditions1 + " and " + conditions2); - }else if (log1 == null && log2 != null){ + } else if (log1 == null && log2 != null) { log1 = log2; } return log1; diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 1af9bdb..2a4a96d 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -108,15 +108,15 @@ public class ParseStaticThreshold { try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); - parms.put("pageSize",-1); - parms.put("orderBy","profileId asc"); - parms.put("isValid",1); + parms.put("pageSize", -1); + parms.put("orderBy", "profileId asc"); + parms.put("isValid", 1); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); String token = CommonConfig.BIFANG_SERVER_TOKEN; if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization,authorization1); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); boolean success = (boolean) resposeMap.get("success"); @@ -154,42 +154,42 @@ public class ParseStaticThreshold { IPAddress address = ipAddressString.getAddress(); Map floodTypeThresholdMap = new HashMap<>(); floodTypeThresholdMap.put(threshold.getAttackType(), threshold); - if (address.isPrefixed()){ + if (address.isPrefixed()) { IPAddress lower = address.getLower(); IPAddress upper = address.getUpper(); - if (!address.isMultiple()){ + if (!address.isMultiple()) { lower = address.adjustPrefixLength(address.getBitCount()); upper = address.toMaxHost().withoutPrefixLength(); } Map.Entry, Map> lowerEntry = thresholdRangeMap.getEntry(lower); Map.Entry, Map> upperEntry = thresholdRangeMap.getEntry(upper); - if (lowerEntry == null && upperEntry == null){ - thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); - }else if (lowerEntry != null && upperEntry == null){ + if (lowerEntry != null && upperEntry == null) { Range lowerEntryKey = lowerEntry.getKey(); Map lowerEntryValue = lowerEntry.getValue(); lowerEntryValue.put(threshold.getAttackType(), threshold); thresholdRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue); thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); - }else if (lowerEntry == null){ + } else if (lowerEntry == null && upperEntry != null) { Range upperEntryKey = upperEntry.getKey(); Map upperEntryValue = upperEntry.getValue(); upperEntryValue.put(threshold.getAttackType(), threshold); thresholdRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue); thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + } else { + thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); } - }else { + } else { Map.Entry, Map> entry = thresholdRangeMap.getEntry(address); - if (entry != null){ + if (entry != null) { Range entryKey = entry.getKey(); Map entryValue = entry.getValue(); - entryValue.put(threshold.getAttackType(), threshold); - if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()){ + if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()) { + entryValue.put(threshold.getAttackType(), threshold); thresholdRangeMap.put(Range.closed(address, address), entryValue); - }else { + } else { thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); } - }else { + } else { thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 2c11628..97286f7 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -60,7 +60,7 @@ flink.detection.map.parallelism=1 flink.watermark.max.orderness=10 #计算窗口大小,默认600s -flink.window.max.time=600 +flink.window.max.time=60 #dos event结果中distinct source IP限制 source.ip.list.limit=10000 @@ -89,7 +89,8 @@ baseline.sessions.severe.threshold=3 baseline.sessions.critical.threshold=8 #bifang服务访问地址 -bifang.server.uri=http://192.168.44.72:80 +#bifang.server.uri=http://192.168.44.72:80 +bifang.server.uri=http://192.168.44.3:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867