package com.zdjizhi.function; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.utils.DateUtils; import com.zdjizhi.common.*; import com.zdjizhi.utils.Snowflakeld.SnowflakeId; import com.zdjizhi.utils.Threshold.ParseBaselineThreshold; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.text.NumberFormat; import java.util.*; import java.util.stream.Collectors; import static com.zdjizhi.conf.DosConfigs.*; /** * @author wlh */ public class DosDetectionFunction extends ProcessFunction { private static final Log logger = LogFactory.get(); private Map> baselineMap = new HashMap<>(); private final NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); // private HashMap>> thresholdRangeMap; private final int BASELINE_SIZE = 144; private final int STATIC_CONDITION_TYPE = 1; private final int BASELINE_CONDITION_TYPE = 2; private final int SENSITIVITY_CONDITION_TYPE = 3; private final String SESSIONS_TAG = "sessions"; private final String PACKETS_TAG = "packets"; private final String BITS_TAG = "bits"; private final int OTHER_BASELINE_TYPE = 3; private SnowflakeId snowflakeId; private Configuration configuration; //private IpLookupUtils ipLookupUtils; private ParseBaselineThreshold parseBaselineThresholdld; @Override public void open(Configuration parameters) { configuration = (Configuration) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); snowflakeId = new SnowflakeId(configuration.get(DATA_CENTER_ID_NUM), getRuntimeContext().getIndexOfThisSubtask()); try { parseBaselineThresholdld = new ParseBaselineThreshold(configuration); baselineMap = parseBaselineThresholdld.readFromHbase(); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { baselineMap = parseBaselineThresholdld.readFromHbase(); logger.info("从Hbase获取baselineMap成功,baselineMap:" + baselineMap.toString()); } }, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000); } catch (Exception e) { logger.error("从Hbase获取baselineMap失败,失败原因为:" + e); } PERCENT_INSTANCE.setMinimumFractionDigits(2); } @Override public void processElement(DosSketchLog value, Context ctx, Collector out) throws Exception { DosEventLog finalResult = null; try { if (value.getRule_id() == 0) { String destinationIp = value.getServer_ip(); int vsysId = value.getVsys_id(); String key = destinationIp + "-" + vsysId; String attackType = value.getAttack_type(); DosDetectionThreshold threshold = null; logger.debug("当前判断IP:{}, 类型: {}", key, attackType); if (threshold == null && baselineMap.containsKey(key)) { finalResult = getDosEventLogByBaseline(value, key); } else if (threshold == null && !baselineMap.containsKey(key)) { finalResult = getDosEventLogBySensitivityThreshold(value); } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType); } } else{ finalResult = getResult(value,0,0,Severity.MAJOR,0.0,0,"DoS Protection ["+value.getRule_id()+"]"); } } catch(Exception e){ logger.error("判定失败\n {} \n{}", value, e); } if (finalResult != null) { out.collect(finalResult); } } private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { long sketchSessionsRate = value.getSession_rate(); Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD); long diff = sketchSessionsRate - staticSensitivityThreshold; return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String key) { String attackType = value.getAttack_type(); long sketchSessionsRate = value.getSession_rate(); DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType); Integer baseSessionRate = getBaseValue(dosBaselineThreshold, value); long diff = sketchSessionsRate - baseSessionRate; return getDosEventLog(value, baseSessionRate, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG); } private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) { DosEventLog result = null; String destinationIp = value.getServer_ip(); String attackType = value.getAttack_type(); if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD); if (severity != Severity.NORMAL) { if (type == BASELINE_CONDITION_TYPE && percent < configuration.get(BASELINE_SENSITIVITY_THRESHOLD)) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSession_rate() < staticSensitivityThreshold) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else { result = getResult(value, base, profileId, severity, percent, type, tag); if (type == SENSITIVITY_CONDITION_TYPE) { result.setSeverity(Severity.MAJOR.severity); } logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, tag, result); } } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value); } } return result; } private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setRecv_time(value.getRecv_time()); dosEventLog.setLog_id(snowflakeId.nextId()); dosEventLog.setVsys_id(value.getVsys_id()); dosEventLog.setStart_time(value.getStart_timestamp_ms()/1000); dosEventLog.setEnd_time(value.getEnd_timestamp_ms()/1000); dosEventLog.setProfile_id(profileId); dosEventLog.setRule_id(value.getRule_id()); dosEventLog.setAttack_type(value.getAttack_type()); if(base != 0) { dosEventLog.setSeverity(severity.severity); dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSession_rate(), type, tag, dosEventLog)); } else{ dosEventLog.setSeverity(severity.severity); dosEventLog.setConditions(tag); } dosEventLog.setDestination_ip(value.getServer_ip()); dosEventLog.setDestination_country(value.getServer_country()); dosEventLog.setSource_ip_list(value.getClient_ips().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(","))); dosEventLog.setSource_country_list(value.getClient_countrys().stream().filter(ip -> !ip.isEmpty()).collect(Collectors.joining(","))); dosEventLog.setSession_rate(value.getSession_rate()); dosEventLog.setPacket_rate(value.getPacket_rate()); dosEventLog.setBit_rate(value.getBit_rate()); dosEventLog.setBytes(value.getBytes()); dosEventLog.setSessions(value.getSessions()); dosEventLog.setPackets(value.getPkts()); return dosEventLog; } private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) { Integer base = 0; try { if (dosBaselineThreshold != null) { ArrayList baselines = dosBaselineThreshold.getSession_rate(); Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value(); Integer sessionRateBaselineType = dosBaselineThreshold.getSession_rate_baseline_type(); if (baselines != null && baselines.size() == BASELINE_SIZE) { int timeIndex = getCurrentTimeIndex(value.getStart_timestamp_ms()); base = baselines.get(timeIndex); if (base == 0) { logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getServer_ip(), value.getAttack_type(), defaultVaule); base = defaultVaule; } if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < configuration.get(STATIC_SENSITIVITY_THRESHOLD)) { base = configuration.get(STATIC_SENSITIVITY_THRESHOLD); } } } } catch (Exception e) { logger.error("解析baseline数据失败,返回默认值0", e); } return base; } private String getConditions(String percent, long base, long sessions, int type, String tag, DosEventLog dosEventLog) { int condition = 0; if ("Minor".equals(dosEventLog.getSeverity())) { condition = 50; } else if ("Warning".equals(dosEventLog.getSeverity())) { condition = 100; } else if ("Major".equals(dosEventLog.getSeverity())) { condition = 250; } else if ("Severe".equals(dosEventLog.getSeverity())) { condition = 500; } else if ("Critical".equals(dosEventLog.getSeverity())) { condition = 800; } switch (type) { case STATIC_CONDITION_TYPE: return "Rate > " + base + " " + tag + "/s" + "(>" + condition + "%)"; case BASELINE_CONDITION_TYPE: return tag + " > " + percent + " of baseline"; case SENSITIVITY_CONDITION_TYPE: return String.valueOf(sessions) + " " + tag + "/s Unusually high " + StringUtils.capitalize(tag); default: throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]"); } } private int getCurrentTimeIndex(long sketchStartTime) { int index = 0; try { long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000; long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE); index = Integer.parseInt(Long.toString(indexLong)); } catch (Exception e) { logger.error("获取time index失败", e); } return index; } private Double getDiffPercent(long diff, long base) { try { return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); } catch (Exception e) { logger.info("当前阈值为0,进行下一阈值条件判断", e); return 0.0; } } private Severity judgeSeverity(double diffPercent) { if (diffPercent >= configuration.get(BASELINE_SESSIONS_MINOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD)) { return Severity.MINOR; } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD)) { return Severity.WARNING; } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD)) { return Severity.MAJOR; } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) { return Severity.SEVERE; } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) { return Severity.CRITICAL; } else { return Severity.NORMAL; } } private enum Severity { /** * 判断严重程度枚举类型 */ CRITICAL("Critical"), SEVERE("Severe"), MAJOR("Major"), WARNING("Warning"), MINOR("Minor"), NORMAL("Normal"); private final String severity; @Override public String toString() { return this.severity; } Severity(String severity) { this.severity = severity; } } }