基于DoS Sketch一元组进行实时检测

This commit is contained in:
wanglihui
2021-09-14 18:46:23 +08:00
parent 8cfb442c44
commit 62f3c65d66
9 changed files with 118 additions and 77 deletions

View File

@@ -66,4 +66,5 @@ public class CommonConfig {
public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password");
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
}

View File

@@ -65,15 +65,13 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) {
finalResult = getDosEventLogByBaseline(value, destinationIp, attackType).f1;
} else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) {
finalResult = getDosEventLogByStaticThreshold(value, thresholdMap).f1;
} else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) {
Tuple2<Severity, DosEventLog> eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType);
Tuple2<Severity, DosEventLog> eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap);
finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold);
} else {
if (thresholdMap == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value, destinationIp, attackType);
}else if (thresholdMap == null && !baselineMap.containsKey(destinationIp)){
finalResult = getDosEventLogBySensitivityThreshold(value);
} else if (thresholdMap != null){
finalResult = getDosEventLogByStaticThreshold(value, thresholdMap);
}else {
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
@@ -83,37 +81,29 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return finalResult;
}
private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) {
logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1);
} else {
logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1);
}
}
private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) {
if (log1 != null && log2 != null) {
String conditions1 = log1.getConditions();
String conditions2 = log2.getConditions();
log1.setConditions(conditions1 + " and " + conditions2);
}else if (log1 == null && log2 != null){
log1 = log2;
}
return log1;
}
private Tuple2<Severity, DosEventLog> getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) {
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value){
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
return sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD ?
getDosEventLog(value, base, sketchSessions - base, "baseline") : Tuple2.of(Severity.NORMAL, null);
if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){
result = getDosEventLog(value, CommonConfig.SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.SENSITIVITY_THRESHOLD, "sensitivity");
result.setSeverity(Severity.MAJOR.severity);
}
return result;
}
private Tuple2<Severity, DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
Tuple2<Severity, DosEventLog> result = Tuple2.of(Severity.NORMAL, null);
private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD){
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
result = getDosEventLog(value, base, sketchSessions - base, "baseline");
}
return result;
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
DosEventLog result = null;
String attackType = value.getAttack_type();
if (thresholdMap.containsKey(attackType)) {
DosDetectionThreshold threshold = thresholdMap.get(attackType);
@@ -124,14 +114,13 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return result;
}
private Tuple2<Severity, DosEventLog> getDosEventLog(DosSketchLog value, long base, long diff, String tag) {
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
Severity severity = Severity.NORMAL;
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
severity = judgeSeverity(percent);
Severity severity = judgeSeverity(percent);
if (severity != Severity.NORMAL) {
result = getResult(value, severity, percent, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result);
@@ -139,7 +128,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
logger.debug("当前server IP{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
}
}
return Tuple2.of(severity, result);
return result;
}
private DosEventLog getResult(DosSketchLog value, Severity severity, double percent, String tag) {
@@ -188,6 +177,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return "sessions > " + percent + " of baseline";
case "static":
return "sessions > " + sessions + " sessions/s";
case "sensitivity":
return sessions+" sessions/s Unusually high Sessions";
default:
return null;
}
@@ -208,6 +199,11 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return Integer.parseInt(Long.toString(indexLong));
}
public static void main(String[] args) {
System.out.println(1631579940 / (60 * 60) * 60 * 60);
System.out.println(new DosDetection().getCurrentTimeIndex(1631579940));
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
@@ -254,4 +250,27 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
@Deprecated
private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) {
logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1);
} else {
logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1);
}
}
@Deprecated
private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) {
if (log1 != null && log2 != null) {
String conditions1 = log1.getConditions();
String conditions2 = log2.getConditions();
log1.setConditions(conditions1 + " and " + conditions2);
}else if (log1 == null && log2 != null){
log1 = log2;
}
return log1;
}
}

View File

@@ -42,7 +42,7 @@ public class ParseStaticThreshold {
String psw = HttpClientUtils.ERROR_MESSAGE;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, String> parms = new HashMap<>();
HashMap<String, Object> parms = new HashMap<>();
parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
@@ -75,7 +75,7 @@ public class ParseStaticThreshold {
try {
if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, String> parms = new HashMap<>();
HashMap<String, Object> parms = new HashMap<>();
parms.put("username", CommonConfig.BIFANG_SERVER_USER);
parms.put("password", encryptpwd);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
@@ -107,7 +107,9 @@ public class ParseStaticThreshold {
ArrayList<DosDetectionThreshold> thresholds = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize",-1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = loginBifangServer();
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
@@ -142,22 +144,6 @@ public class ParseStaticThreshold {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
String attackType = threshold.getAttackType();
switch (attackType) {
case "tcp_syn_flood":
threshold.setAttackType("TCP SYN Flood");
break;
case "udp_flood":
threshold.setAttackType("UDP Flood");
break;
case "icmp_flood":
threshold.setAttackType("ICMP Flood");
break;
case "dns_amplification":
threshold.setAttackType("DNS Amplification");
break;
default:
}
ArrayList<String> serverIpList = threshold.getServerIpList();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
@@ -168,7 +154,16 @@ public class ParseStaticThreshold {
floodTypeThresholdMap = new HashMap<>();
}
floodTypeThresholdMap.put(threshold.getAttackType(), threshold);
thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap);
if (address.isPrefixed()){
if (address.isMultiple()){
thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap);
}else {
thresholdRangeMap.put(Range.closed(address.adjustPrefixLength(address.getBitCount()),
address.toMaxHost().withoutPrefixLength()), floodTypeThresholdMap);
}
}else {
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
}
}
}
}
@@ -180,7 +175,15 @@ public class ParseStaticThreshold {
}
public static void main(String[] args) {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
System.out.println("------------------------");
TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
/*
Map<Range<IPAddress>, Map<String, DosDetectionThreshold>> rangeMapMap = staticThreshold.asMapOfRanges();
for (Range<IPAddress> range : rangeMapMap.keySet()) {
Map<String, DosDetectionThreshold> thresholdMap = rangeMapMap.get(range);
@@ -189,6 +192,8 @@ public class ParseStaticThreshold {
System.out.println(range + "---" + type + "---" + threshold);
}
}
*/
}

View File

@@ -20,9 +20,11 @@ public class DosSketchSource {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return streamExeEnv.addSource(new FlinkKafkaConsumer<String>(
CommonConfig.KAFKA_INPUT_TOPIC_NAME,

View File

@@ -31,7 +31,7 @@ public class HbaseUtils {
floodTypeList.add("TCP SYN Flood");
floodTypeList.add("UDP Flood");
floodTypeList.add("ICMP Flood");
floodTypeList.add("DNS Amplification");
floodTypeList.add("DNS Flood");
}
private static void prepareHbaseEnv() throws IOException {

View File

@@ -253,12 +253,12 @@ public class HttpClientUtils {
* 拼装url
* url ,参数map
*/
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, String> params) {
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) {
try {
uriBuilder.setPath(path);
if (params != null && !params.isEmpty()){
for (Map.Entry<String, String> kv : params.entrySet()) {
uriBuilder.setParameter(kv.getKey(),kv.getValue());
for (Map.Entry<String, Object> kv : params.entrySet()) {
uriBuilder.setParameter(kv.getKey(),kv.getValue().toString());
}
}
} catch (Exception e) {

View File

@@ -11,10 +11,11 @@ public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return properties;
}

View File

@@ -11,8 +11,8 @@ kafka.input.parallelism=1
kafka.input.topic.name=DOS-SKETCH-RECORD
#输入kafka地址
kafka.input.bootstrap.servers=192.168.44.12:9092
#kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#kafka.input.bootstrap.servers=192.168.44.12:9092
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2108231709
@@ -30,7 +30,7 @@ kafka.output.event.parallelism=1
#发送kafka event topic名
#kafka.output.event.topic.name=DOS-EVENT
kafka.output.event.topic.name=test
kafka.output.event.topic.name=storm-dos-test
#kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9092
@@ -125,4 +125,7 @@ baseline.threshold.schedule.days=7
#kafka用户认证配置参数
sasl.jaas.config.user=admin
sasl.jaas.config.password=galaxy2019
sasl.jaas.config.password=galaxy2019
#是否开启kafka用户认证配置10
sasl.jaas.config.flag=1

View File

@@ -41,7 +41,16 @@ public class IpTest {
IPAddress pv43 = new IPAddressString("fc00::").getAddress();
IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress();
IPAddress pv45 = new IPAddressString("12.56.4.0/32").getAddress();
IPAddress pv45 = new IPAddressString("12.56.4.3/24").getAddress();
IPAddress pv46 = new IPAddressString("12.56.4.0/24").getAddress();
IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress();
System.out.println(pv45.isMultiple());
System.out.println(pv46.isMultiple());
System.out.println(pv46.isPrefixed());
System.out.println(pv47.isPrefixed());
System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount()));
/*
System.out.println(str5.getUpper()+"---"+str5.getLower());
System.out.println(rangeMap.span().contains(pv4));
@@ -50,6 +59,7 @@ public class IpTest {
System.out.println(rangeMap.get(pv42));
System.out.println(rangeMap.get(pv43));
System.out.println(rangeMap.get(pv44));
*/
/*
System.out.println(str5.toSequentialRange());