This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-dos-detecti…/src/main/java/com/zdjizhi/etl/DosDetection.java

258 lines
12 KiB
Java
Raw Normal View History

2021-07-29 10:02:31 +08:00
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
2021-08-20 18:34:40 +08:00
import com.zdjizhi.common.DosDetectionThreshold;
2021-07-29 10:02:31 +08:00
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.HbaseUtils;
2021-07-29 10:02:31 +08:00
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId;
2021-08-20 18:34:40 +08:00
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
2021-07-29 10:02:31 +08:00
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
2021-07-29 10:02:31 +08:00
import org.apache.flink.configuration.Configuration;
2021-08-20 18:34:40 +08:00
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
2021-07-29 10:02:31 +08:00
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
2021-07-29 10:02:31 +08:00
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
2021-07-29 10:02:31 +08:00
/**
* @author wlh
*/
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
2021-07-29 10:02:31 +08:00
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
private final static int BASELINE_SIZE = 144;
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
2021-08-20 18:34:40 +08:00
private TreeRangeMap<IPAddress, Map<String, DosDetectionThreshold>> thresholdRangeMap;
2021-07-29 10:02:31 +08:00
@Override
public void open(Configuration parameters) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
executorService.scheduleAtFixedRate(() -> {
//do something
thresholdRangeMap = ParseStaticThreshold.createStaticThreshold();
}, 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
executorService.scheduleAtFixedRate(() -> {
//do something
baselineMap = HbaseUtils.readFromHbase();
}, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("定时器任务执行失败", e);
}
PERCENT_INSTANCE.setMinimumFractionDigits(2);
2021-07-29 10:02:31 +08:00
}
@Override
public DosEventLog map(DosSketchLog value) {
2021-08-20 18:34:40 +08:00
DosEventLog finalResult = null;
try {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
2021-08-20 18:34:40 +08:00
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
Map<String, DosDetectionThreshold> thresholdMap = thresholdRangeMap.get(destinationIpAddress);
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) {
finalResult = getDosEventLogByBaseline(value, destinationIp, attackType).f1;
} else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) {
finalResult = getDosEventLogByStaticThreshold(value, thresholdMap).f1;
} else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) {
Tuple2<Severity, DosEventLog> eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType);
Tuple2<Severity, DosEventLog> eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap);
finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold);
} else {
2021-08-20 18:34:40 +08:00
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
2021-07-29 10:02:31 +08:00
}
} catch (Exception e) {
logger.error("判定失败\n {} \n{}", value, e);
2021-07-29 10:02:31 +08:00
}
2021-08-20 18:34:40 +08:00
return finalResult;
}
private DosEventLog mergeFinalResult(Tuple2<Severity, DosEventLog> eventLogByBaseline, Tuple2<Severity, DosEventLog> eventLogByStaticThreshold) {
if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) {
logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold);
return mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1);
} else {
logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline);
return mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1);
}
}
private DosEventLog mergeCondition(DosEventLog log1, DosEventLog log2) {
if (log1 != null && log2 != null) {
String conditions1 = log1.getConditions();
String conditions2 = log2.getConditions();
log1.setConditions(conditions1 + " and " + conditions2);
}else if (log1 == null && log2 != null){
log1 = log2;
}
return log1;
2021-08-20 18:34:40 +08:00
}
private Tuple2<Severity, DosEventLog> getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) {
2021-08-20 18:34:40 +08:00
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
long sketchSessions = value.getSketch_sessions();
return sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD ?
getDosEventLog(value, base, sketchSessions - base, "baseline") : Tuple2.of(Severity.NORMAL, null);
2021-08-20 18:34:40 +08:00
}
private Tuple2<Severity, DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, Map<String, DosDetectionThreshold> thresholdMap) {
Tuple2<Severity, DosEventLog> result = Tuple2.of(Severity.NORMAL, null);
2021-08-20 18:34:40 +08:00
String attackType = value.getAttack_type();
if (thresholdMap.containsKey(attackType)) {
DosDetectionThreshold threshold = thresholdMap.get(attackType);
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
result = getDosEventLog(value, base, diff, "static");
2021-08-20 18:34:40 +08:00
}
return result;
}
private Tuple2<Severity, DosEventLog> getDosEventLog(DosSketchLog value, long base, long diff, String tag) {
2021-08-20 18:34:40 +08:00
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
Severity severity = Severity.NORMAL;
2021-08-20 18:34:40 +08:00
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
severity = judgeSeverity(percent);
2021-08-20 18:34:40 +08:00
if (severity != Severity.NORMAL) {
result = getResult(value, severity, percent, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp,attackType,base,percent,result);
2021-08-20 18:34:40 +08:00
} else {
logger.debug("当前server IP{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
}
}
return Tuple2.of(severity, result);
2021-07-29 10:02:31 +08:00
}
private DosEventLog getResult(DosSketchLog value, Severity severity, double percent, String tag) {
2021-07-29 10:02:31 +08:00
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
2021-07-29 10:02:31 +08:00
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), value.getSketch_sessions(), tag));
2021-07-29 10:02:31 +08:00
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
dosEventLog.setSession_rate(value.getSketch_sessions());
dosEventLog.setPacket_rate(value.getSketch_packets());
dosEventLog.setBit_rate(value.getSketch_bytes());
return dosEventLog;
}
private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) {
Integer base = 0;
try {
2021-08-20 18:34:40 +08:00
if (floodTypeTup != null) {
ArrayList<Integer> baselines = floodTypeTup.f0;
Integer defaultVaule = floodTypeTup.f1;
if (baselines != null && baselines.size() == BASELINE_SIZE) {
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
base = baselines.get(timeIndex);
if (base == 0) {
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
base = defaultVaule;
}
}
}
} catch (Exception e) {
logger.error("解析baseline数据失败,返回默认值0", e);
}
return base;
}
private String getConditions(String percent, long sessions, String tag) {
switch (tag) {
case "baseline":
return "sessions > " + percent + " of baseline";
case "static":
return "sessions > " + sessions + " sessions/s";
default:
return null;
}
2021-07-29 10:02:31 +08:00
}
private String getSourceCountryList(String sourceIpList) {
2021-07-29 10:02:31 +08:00
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
2021-07-29 10:02:31 +08:00
countrySet.add(IpUtils.ipLookup.countryLookup(ip));
}
return StringUtils.join(countrySet, ",");
2021-07-29 10:02:31 +08:00
}
private int getCurrentTimeIndex(long sketchStartTime) {
2021-07-29 10:02:31 +08:00
long currentDayTime = sketchStartTime / (60 * 60 * 24) * 60 * 60 * 24;
long indexLong = (sketchStartTime - currentDayTime) / 600;
return Integer.parseInt(Long.toString(indexLong));
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float)diff/base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
2021-07-29 10:02:31 +08:00
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD) {
2021-07-29 10:02:31 +08:00
return Severity.MINOR;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD) {
2021-07-29 10:02:31 +08:00
return Severity.WARNING;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD) {
2021-07-29 10:02:31 +08:00
return Severity.MAJOR;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
2021-07-29 10:02:31 +08:00
return Severity.SEVERE;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
2021-07-29 10:02:31 +08:00
return Severity.CRITICAL;
} else {
2021-07-29 10:02:31 +08:00
return Severity.NORMAL;
}
}
private enum Severity {
/**
* 判断严重程度枚举类型
*/
CRITICAL("Critical", 5),
SEVERE("Severe", 4),
MAJOR("Major", 3),
WARNING("Warning", 2),
MINOR("Minor", 1),
NORMAL("Normal", 0);
2021-07-29 10:02:31 +08:00
private final String severity;
private final int score;
2021-07-29 10:02:31 +08:00
@Override
public String toString() {
return this.severity;
}
Severity(String severity, int score) {
2021-07-29 10:02:31 +08:00
this.severity = severity;
this.score = score;
2021-07-29 10:02:31 +08:00
}
}
2021-07-29 10:02:31 +08:00
}