diff --git a/pom.xml b/pom.xml index 9567eb7..f238d08 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,7 @@ org.apache.hbase hbase-client 2.2.3 + slf4j-log4j12 diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 815cc05..1f471fb 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -42,15 +42,11 @@ public class DosDetection extends RichMapFunction { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); try { - executorService.scheduleAtFixedRate(() -> { - //do something - thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(); - }, 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); + executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, + CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); - executorService.scheduleAtFixedRate(() -> { - //do something - baselineMap = HbaseUtils.readFromHbase(); - }, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); + executorService.scheduleAtFixedRate(() -> baselineMap = HbaseUtils.readFromHbase(), 0, + CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); } catch (Exception e) { logger.error("定时器任务执行失败", e); } @@ -67,10 +63,10 @@ public class DosDetection extends RichMapFunction { Map thresholdMap = thresholdRangeMap.get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); if (thresholdMap == null && baselineMap.containsKey(destinationIp)) { - finalResult = getDosEventLogByBaseline(value, destinationIp, attackType); + finalResult = getDosEventLogByBaseline(value); }else if (thresholdMap == null && !baselineMap.containsKey(destinationIp)){ finalResult = getDosEventLogBySensitivityThreshold(value); - } else if (thresholdMap != null){ + }else if (thresholdMap != null){ finalResult = getDosEventLogByStaticThreshold(value, thresholdMap); }else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); @@ -92,8 +88,10 @@ public class DosDetection extends RichMapFunction { return result; } - private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) { + private DosEventLog getDosEventLogByBaseline(DosSketchLog value) { DosEventLog result = null; + String destinationIp = value.getDestination_ip(); + String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD){ Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); @@ -127,7 +125,7 @@ public class DosDetection extends RichMapFunction { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}",destinationIp,attackType,base,percent,value); }else { result = getResult(value,base, severity, percent, tag); - logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result); + logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}检测,日志详情\n {}", destinationIp,attackType,base,percent,tag,result); } } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 999b14f..6ac9fa3 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -109,11 +110,14 @@ public class ParseStaticThreshold { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); parms.put("pageSize",-1); + parms.put("orderBy","profileId asc"); + parms.put("isValid",1); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); String token = CommonConfig.BIFANG_SERVER_TOKEN; if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization); + BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization,authorization1); if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); boolean success = (boolean) resposeMap.get("success"); @@ -149,17 +153,31 @@ public class ParseStaticThreshold { IPAddressString ipAddressString = new IPAddressString(sip); if (ipAddressString.isIPAddress()) { IPAddress address = ipAddressString.getAddress(); - Map floodTypeThresholdMap = thresholdRangeMap.get(address); - if (floodTypeThresholdMap == null) { - floodTypeThresholdMap = new HashMap<>(); - } + Map floodTypeThresholdMap = new HashMap<>(); floodTypeThresholdMap.put(threshold.getAttackType(), threshold); if (address.isPrefixed()){ - if (address.isMultiple()){ - thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); - }else { - thresholdRangeMap.put(Range.closed(address.adjustPrefixLength(address.getBitCount()), - address.toMaxHost().withoutPrefixLength()), floodTypeThresholdMap); + IPAddress lower = address.getLower(); + IPAddress upper = address.getUpper(); + if (!address.isMultiple()){ + lower = address.adjustPrefixLength(address.getBitCount()); + upper = address.toMaxHost().withoutPrefixLength(); + } + Map.Entry, Map> lowerEntry = thresholdRangeMap.getEntry(lower); + Map.Entry, Map> upperEntry = thresholdRangeMap.getEntry(upper); + if (lowerEntry == null && upperEntry == null){ + thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + }else if (lowerEntry != null && upperEntry == null){ + Range lowerEntryKey = lowerEntry.getKey(); + Map lowerEntryValue = lowerEntry.getValue(); + lowerEntryValue.put(threshold.getAttackType(), threshold); + thresholdRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue); + thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); + }else if (lowerEntry == null){ + Range upperEntryKey = upperEntry.getKey(); + Map upperEntryValue = upperEntry.getValue(); + upperEntryValue.put(threshold.getAttackType(), threshold); + thresholdRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue); + thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap); } }else { thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); @@ -183,7 +201,7 @@ public class ParseStaticThreshold { System.out.println("------------------------"); TreeRangeMap> staticThreshold = createStaticThreshold(); - /* + System.out.println("------------------------"); Map, Map> rangeMapMap = staticThreshold.asMapOfRanges(); for (Range range : rangeMapMap.keySet()) { Map thresholdMap = rangeMapMap.get(range); @@ -192,7 +210,7 @@ public class ParseStaticThreshold { System.out.println(range + "---" + type + "---" + threshold); } } - */ + } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 75f5015..ce634ea 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -33,7 +33,7 @@ kafka.output.event.parallelism=1 kafka.output.event.topic.name=storm-dos-test #kafka输出地址 -kafka.output.bootstrap.servers=192.168.44.12:9092 +kafka.output.bootstrap.servers=192.168.44.12:9094 #kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #zookeeper地址 @@ -60,7 +60,7 @@ flink.detection.map.parallelism=1 flink.watermark.max.orderness=10 #计算窗口大小,默认600s -flink.window.max.time=600 +flink.window.max.time=10 #dos event结果中distinct source IP限制 source.ip.list.limit=10000 diff --git a/src/test/java/com/zdjizhi/common/IpTest.java b/src/test/java/com/zdjizhi/common/IpTest.java index 3399d4b..4463f84 100644 --- a/src/test/java/com/zdjizhi/common/IpTest.java +++ b/src/test/java/com/zdjizhi/common/IpTest.java @@ -41,8 +41,8 @@ public class IpTest { IPAddress pv43 = new IPAddressString("fc00::").getAddress(); IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress(); - IPAddress pv45 = new IPAddressString("12.56.4.3/24").getAddress(); - IPAddress pv46 = new IPAddressString("12.56.4.0/24").getAddress(); + IPAddress pv45 = new IPAddressString("192.168.42.1/32").getAddress(); + IPAddress pv46 = new IPAddressString("192.168.42.1/32").getAddress(); IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress(); System.out.println(pv45.isMultiple()); System.out.println(pv46.isMultiple()); @@ -50,6 +50,8 @@ public class IpTest { System.out.println(pv47.isPrefixed()); System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount())); + System.out.println(pv45.adjustPrefixLength(pv45.getBitCount())+"---"+pv45.toMaxHost().withoutPrefixLength()); + /* System.out.println(str5.getUpper()+"---"+str5.getLower());