diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 724c3e3..55b58fe 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -66,4 +66,5 @@ public class CommonConfig { public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user"); public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password"); + public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag"); } diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index df141ad..69660d2 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -65,15 +65,13 @@ public class DosDetection extends RichMapFunction { IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); Map thresholdMap = thresholdRangeMap.get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) { - finalResult = getDosEventLogByBaseline(value, destinationIp, attackType).f1; - } else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) { - finalResult = getDosEventLogByStaticThreshold(value, thresholdMap).f1; - } else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) { - Tuple2 eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType); - Tuple2 eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap); - finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold); - } else { + if (thresholdMap == null && baselineMap.containsKey(destinationIp)) { + finalResult = getDosEventLogByBaseline(value, destinationIp, attackType); + }else if (thresholdMap == null && !baselineMap.containsKey(destinationIp)){ + finalResult = getDosEventLogBySensitivityThreshold(value); + } else if (thresholdMap != null){ + finalResult = getDosEventLogByStaticThreshold(value, thresholdMap); + }else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } @@ -83,37 +81,29 @@ public class DosDetection extends RichMapFunction { return finalResult; } - private DosEventLog mergeFinalResult(Tuple2 eventLogByBaseline, Tuple2 eventLogByStaticThreshold) { - if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) { - logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); - return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1); - } else { - logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); - return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1); - } - } - - private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) { - if (log1 != null && log2 != null) { - String conditions1 = log1.getConditions(); - String conditions2 = log2.getConditions(); - log1.setConditions(conditions1 + " and " + conditions2); - }else if (log1 == null && log2 != null){ - log1 = log2; - } - return log1; - } - - private Tuple2 getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) { - Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); - Integer base = getBaseValue(floodTypeTup, value); + private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value){ + DosEventLog result = null; long sketchSessions = value.getSketch_sessions(); - return sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD ? - getDosEventLog(value, base, sketchSessions - base, "baseline") : Tuple2.of(Severity.NORMAL, null); + if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){ + result = getDosEventLog(value, CommonConfig.SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.SENSITIVITY_THRESHOLD, "sensitivity"); + result.setSeverity(Severity.MAJOR.severity); + } + return result; } - private Tuple2 getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) { - Tuple2 result = Tuple2.of(Severity.NORMAL, null); + private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) { + DosEventLog result = null; + long sketchSessions = value.getSketch_sessions(); + if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){ + Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); + Integer base = getBaseValue(floodTypeTup, value); + result = getDosEventLog(value, base, sketchSessions - base, "baseline"); + } + return result; + } + + private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) { + DosEventLog result = null; String attackType = value.getAttack_type(); if (thresholdMap.containsKey(attackType)) { DosDetectionThreshold threshold = thresholdMap.get(attackType); @@ -124,14 +114,13 @@ public class DosDetection extends RichMapFunction { return result; } - private Tuple2 getDosEventLog(DosSketchLog value, long base, long diff, String tag) { + private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, String tag) { DosEventLog result = null; String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); - Severity severity = Severity.NORMAL; if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); - severity = judgeSeverity(percent); + Severity severity = judgeSeverity(percent); if (severity != Severity.NORMAL) { result = getResult(value, severity, percent, tag); logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result); @@ -139,7 +128,7 @@ public class DosDetection extends RichMapFunction { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString()); } } - return Tuple2.of(severity, result); + return result; } private DosEventLog getResult(DosSketchLog value, Severity severity, double percent, String tag) { @@ -188,6 +177,8 @@ public class DosDetection extends RichMapFunction { return "sessions > " + percent + " of baseline"; case "static": return "sessions > " + sessions + " sessions/s"; + case "sensitivity": + return sessions+" sessions/s Unusually high Sessions"; default: return null; } @@ -208,6 +199,11 @@ public class DosDetection extends RichMapFunction { return Integer.parseInt(Long.toString(indexLong)); } + public static void main(String[] args) { + System.out.println(1631579940 / (60 * 60) * 60 * 60); + System.out.println(new DosDetection().getCurrentTimeIndex(1631579940)); + } + private Double getDiffPercent(long diff, long base) { return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); } @@ -254,4 +250,27 @@ public class DosDetection extends RichMapFunction { } + @Deprecated + private DosEventLog mergeFinalResult(Tuple2 eventLogByBaseline, Tuple2 eventLogByStaticThreshold) { + if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) { + logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); + return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1); + } else { + logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); + return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1); + } + } + + @Deprecated + private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) { + if (log1 != null && log2 != null) { + String conditions1 = log1.getConditions(); + String conditions2 = log2.getConditions(); + log1.setConditions(conditions1 + " and " + conditions2); + }else if (log1 == null && log2 != null){ + log1 = log2; + } + return log1; + } + } diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 3c2d1b4..ab8c5cd 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -42,7 +42,7 @@ public class ParseStaticThreshold { String psw = HttpClientUtils.ERROR_MESSAGE; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HashMap parms = new HashMap<>(); + HashMap parms = new HashMap<>(); parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build()); @@ -75,7 +75,7 @@ public class ParseStaticThreshold { try { if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HashMap parms = new HashMap<>(); + HashMap parms = new HashMap<>(); parms.put("username", CommonConfig.BIFANG_SERVER_USER); parms.put("password", encryptpwd); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms); @@ -107,7 +107,9 @@ public class ParseStaticThreshold { ArrayList thresholds = null; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null); + HashMap parms = new HashMap<>(); + parms.put("pageSize",-1); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); String token = loginBifangServer(); if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); @@ -142,22 +144,6 @@ public class ParseStaticThreshold { ArrayList dosDetectionThreshold = getDosDetectionThreshold(); if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { for (DosDetectionThreshold threshold : dosDetectionThreshold) { - String attackType = threshold.getAttackType(); - switch (attackType) { - case "tcp_syn_flood": - threshold.setAttackType("TCP SYN Flood"); - break; - case "udp_flood": - threshold.setAttackType("UDP Flood"); - break; - case "icmp_flood": - threshold.setAttackType("ICMP Flood"); - break; - case "dns_amplification": - threshold.setAttackType("DNS Amplification"); - break; - default: - } ArrayList serverIpList = threshold.getServerIpList(); for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); @@ -168,7 +154,16 @@ public class ParseStaticThreshold { floodTypeThresholdMap = new HashMap<>(); } floodTypeThresholdMap.put(threshold.getAttackType(), threshold); - thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); + if (address.isPrefixed()){ + if (address.isMultiple()){ + thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); + }else { + thresholdRangeMap.put(Range.closed(address.adjustPrefixLength(address.getBitCount()), + address.toMaxHost().withoutPrefixLength()), floodTypeThresholdMap); + } + }else { + thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap); + } } } } @@ -180,7 +175,15 @@ public class ParseStaticThreshold { } public static void main(String[] args) { + + ArrayList dosDetectionThreshold = getDosDetectionThreshold(); + dosDetectionThreshold.forEach(System.out::println); + + + System.out.println("------------------------"); TreeRangeMap> staticThreshold = createStaticThreshold(); + + /* Map, Map> rangeMapMap = staticThreshold.asMapOfRanges(); for (Range range : rangeMapMap.keySet()) { Map thresholdMap = rangeMapMap.get(range); @@ -189,6 +192,8 @@ public class ParseStaticThreshold { System.out.println(range + "---" + type + "---" + threshold); } } + */ + } diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java index af7f6ed..6980062 100644 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java @@ -20,9 +20,11 @@ public class DosSketchSource { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID); - properties.setProperty("security.protocol", "SASL_PLAINTEXT"); - properties.setProperty("sasl.mechanism", "PLAIN"); - properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); + if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); + } return streamExeEnv.addSource(new FlinkKafkaConsumer( CommonConfig.KAFKA_INPUT_TOPIC_NAME, diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index c147e61..860ba55 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -31,7 +31,7 @@ public class HbaseUtils { floodTypeList.add("TCP SYN Flood"); floodTypeList.add("UDP Flood"); floodTypeList.add("ICMP Flood"); - floodTypeList.add("DNS Amplification"); + floodTypeList.add("DNS Flood"); } private static void prepareHbaseEnv() throws IOException { diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java index 6a9af77..6691b58 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java @@ -253,12 +253,12 @@ public class HttpClientUtils { * 拼装url * url ,参数map */ - public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map params) { + public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map params) { try { uriBuilder.setPath(path); if (params != null && !params.isEmpty()){ - for (Map.Entry kv : params.entrySet()) { - uriBuilder.setParameter(kv.getKey(),kv.getValue()); + for (Map.Entry kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(),kv.getValue().toString()); } } } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index 0f32683..6042b37 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -11,10 +11,11 @@ public class KafkaUtils { private static Properties getKafkaSinkProperty(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); - properties.setProperty("security.protocol", "SASL_PLAINTEXT"); - properties.setProperty("sasl.mechanism", "PLAIN"); - properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); - + if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){ + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); + } return properties; } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index dfd3ef4..31487fa 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -11,8 +11,8 @@ kafka.input.parallelism=1 kafka.input.topic.name=DOS-SKETCH-RECORD #输入kafka地址 -kafka.input.bootstrap.servers=192.168.44.12:9092 -#kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 +#kafka.input.bootstrap.servers=192.168.44.12:9092 +kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id kafka.input.group.id=2108231709 @@ -30,7 +30,7 @@ kafka.output.event.parallelism=1 #发送kafka event topic名 #kafka.output.event.topic.name=DOS-EVENT -kafka.output.event.topic.name=test +kafka.output.event.topic.name=storm-dos-test #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9092 @@ -125,4 +125,7 @@ baseline.threshold.schedule.days=7 #kafka用户认证配置参数 sasl.jaas.config.user=admin -sasl.jaas.config.password=galaxy2019 \ No newline at end of file +sasl.jaas.config.password=galaxy2019 + +#是否开启kafka用户认证配置,1:是;0:否 +sasl.jaas.config.flag=1 \ No newline at end of file diff --git a/src/test/java/com/zdjizhi/common/IpTest.java b/src/test/java/com/zdjizhi/common/IpTest.java index b522634..3399d4b 100644 --- a/src/test/java/com/zdjizhi/common/IpTest.java +++ b/src/test/java/com/zdjizhi/common/IpTest.java @@ -41,7 +41,16 @@ public class IpTest { IPAddress pv43 = new IPAddressString("fc00::").getAddress(); IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress(); - IPAddress pv45 = new IPAddressString("12.56.4.0/32").getAddress(); + IPAddress pv45 = new IPAddressString("12.56.4.3/24").getAddress(); + IPAddress pv46 = new IPAddressString("12.56.4.0/24").getAddress(); + IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress(); + System.out.println(pv45.isMultiple()); + System.out.println(pv46.isMultiple()); + System.out.println(pv46.isPrefixed()); + System.out.println(pv47.isPrefixed()); + System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount())); + + /* System.out.println(str5.getUpper()+"---"+str5.getLower()); System.out.println(rangeMap.span().contains(pv4)); @@ -50,6 +59,7 @@ public class IpTest { System.out.println(rangeMap.get(pv42)); System.out.println(rangeMap.get(pv43)); System.out.println(rangeMap.get(pv44)); + */ /* System.out.println(str5.toSequentialRange());