package com.zdjizhi.etl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.*; import com.zdjizhi.utils.*; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.text.NumberFormat; import java.util.*; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author wlh */ public class DosDetection extends BroadcastProcessFunction, DosEventLog> { private static final Log logger = LogFactory.get(); private static Map> baselineMap = new HashMap<>(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private HashMap>> thresholdRangeMap; private final static int BASELINE_SIZE = 144; private final static int STATIC_CONDITION_TYPE = 1; private final static int BASELINE_CONDITION_TYPE = 2; private final static int SENSITIVITY_CONDITION_TYPE = 3; private final static String SESSIONS_TAG = "sessions"; private final static String PACKETS_TAG = "packets"; private final static String BITS_TAG = "bits"; private final static int OTHER_BASELINE_TYPE = 3; @Override public void open(Configuration parameters) { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); try { super.open(parameters); logger.info("begin init"); IpUtils.loadIpLook(); logger.info("init over"); executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); } catch (Exception e) { logger.error("定时器任务执行失败", e); } PERCENT_INSTANCE.setMinimumFractionDigits(2); } @Override public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector out) { DosEventLog finalResult = null; try { String destinationIp = value.getDestination_ip(); int vsysId = value.getVsys_id(); String key = destinationIp + "-" + vsysId; String attackType = value.getAttack_type(); IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); DosDetectionThreshold threshold = null; if (thresholdRangeMap.containsKey(vsysId)){ threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress); } logger.debug("当前判断IP:{}, 类型: {}", key, attackType); if (threshold == null && baselineMap.containsKey(key)) { finalResult = getDosEventLogByBaseline(value,key); } else if (threshold == null && !baselineMap.containsKey(key)) { finalResult = getDosEventLogBySensitivityThreshold(value); } else if (threshold != null) { finalResult = getDosEventLogByStaticThreshold(value, threshold); } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType); } } catch (Exception e) { logger.error("判定失败\n {} \n{}", value, e); } if (finalResult != null){ out.collect(finalResult); } } @Override public void processBroadcastElement(Map value, Context ctx, Collector out) throws Exception { IpUtils.updateIpLook(value); } private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { long sketchSessions = value.getSketch_sessions(); Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); long diff = sketchSessions - staticSensitivityThreshold; return getDosEventLog(value, staticSensitivityThreshold, diff,0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) { String attackType = value.getAttack_type(); long sketchSessions = value.getSketch_sessions(); DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); Integer base = getBaseValue(dosBaselineThreshold, value); long diff = sketchSessions - base; return getDosEventLog(value, base, diff, 0,BASELINE_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException { long base = threshold.getSessionsPerSec(); long diff = value.getSketch_sessions() - base; long profileId = threshold.getProfileId(); DosEventLog result = getDosEventLog(value, base, diff, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG); if (result == null) { base = threshold.getPacketsPerSec(); diff = value.getSketch_packets() - base; profileId = threshold.getProfileId(); result = getDosEventLog(value, base, diff,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG); if (result == null) { base = threshold.getBitsPerSec(); diff = value.getSketch_bytes() - base; profileId=threshold.getProfileId(); result = getDosEventLog(value, base, diff, profileId, STATIC_CONDITION_TYPE, BITS_TAG); } } /* ArrayList dosEventLogs = new ArrayList<>(); if (result != null){ dosEventLogs.add(result); Integer[] superiorIds = threshold.getSuperiorIds(); if (superiorIds != null && superiorIds.length > 0){ for (Integer integer:superiorIds){ DosEventLog clone = (DosEventLog) result.clone(); clone.setVsys_id(integer); clone.setLog_id(SnowflakeId.generateId()); dosEventLogs.add(clone); } } } */ return result; } private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) { DosEventLog result = null; String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); if (severity != Severity.NORMAL) { if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); }else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){ logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value); }else { result = getResult(value, base, profileId, severity, percent+1, type, tag); if (type == SENSITIVITY_CONDITION_TYPE){ result.setSeverity(Severity.MAJOR.severity); } logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result); } } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); } } return result; } private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setVsys_id(value.getVsys_id()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); dosEventLog.setProfile_id(profileId); dosEventLog.setAttack_type(value.getAttack_type()); dosEventLog.setSeverity(severity.severity); dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); dosEventLog.setSource_ip_list(ipList); dosEventLog.setSource_country_list(getSourceCountryList(ipList)); dosEventLog.setSession_rate(value.getSketch_sessions()); dosEventLog.setPacket_rate(value.getSketch_packets()); dosEventLog.setBit_rate(value.getSketch_bytes()); return dosEventLog; } private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) { Integer base = 0; try { if (dosBaselineThreshold != null) { ArrayList baselines = dosBaselineThreshold.getSession_rate(); Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value(); Integer sessionRateBaselineType = dosBaselineThreshold.getSession_rate_baseline_type(); if (baselines != null && baselines.size() == BASELINE_SIZE) { int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); base = baselines.get(timeIndex); if (base == 0) { logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule); base = defaultVaule; } if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")){ base = NacosUtils.getIntProperty("static.sensitivity.threshold"); } } } } catch (Exception e) { logger.error("解析baseline数据失败,返回默认值0", e); } return base; } private String getConditions(String percent, long base, long sessions, int type, String tag) { switch (type) { case STATIC_CONDITION_TYPE: return "Rate > " + base + " " + tag + "/s"; case BASELINE_CONDITION_TYPE: return tag + " > " + percent + " of baseline"; case SENSITIVITY_CONDITION_TYPE: return String.valueOf(sessions) + " " + tag + "/s Unusually high " + StringUtils.capitalize(tag); default: throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]"); } } private String getSourceCountryList(String sourceIpList) { if (StringUtil.isNotBlank(sourceIpList)) { String countryList; try { String[] ipArr = sourceIpList.split(","); HashSet countrySet = new HashSet<>(); for (String ip : ipArr) { String country = IpUtils.ipLookup.countryLookup(ip); if (StringUtil.isNotBlank(country)){ countrySet.add(country); } } countryList = StringUtils.join(countrySet, ", "); return countryList; } catch (Exception e) { logger.error("{} source IP lists 获取国家失败", sourceIpList, e); return StringUtil.EMPTY; } } else { throw new IllegalArgumentException("Illegal Argument sourceIpList = null"); } } private int getCurrentTimeIndex(long sketchStartTime) { int index = 0; try { long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000; long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE); index = Integer.parseInt(Long.toString(indexLong)); } catch (Exception e) { logger.error("获取time index失败", e); } return index; } public static void main(String[] args) { // System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," + // "23.200.74.224,161.117.68.253")); // DosDetection dosDetection = new DosDetection(); // System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000))); } private Double getDiffPercent(long diff, long base) { return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); } private Severity judgeSeverity(double diffPercent) { if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold")) { return Severity.MINOR; } else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.major.threshold")) { return Severity.WARNING; } else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.major.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold")) { return Severity.MAJOR; } else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) { return Severity.SEVERE; } else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) { return Severity.CRITICAL; } else { return Severity.NORMAL; } } private enum Severity { /** * 判断严重程度枚举类型 */ CRITICAL("Critical"), SEVERE("Severe"), MAJOR("Major"), WARNING("Warning"), MINOR("Minor"), NORMAL("Normal"); private final String severity; @Override public String toString() { return this.severity; } Severity(String severity) { this.severity = severity; } } }