1.适配IP定位库v4/v6合并后的加载逻辑(GAL-436)。

2.适配bifang23.11接口(TSG-17675)。
3.移除广播流。
4.修复静态阈值判断部分的BUG。
This commit is contained in:
wangchengcheng
2023-11-13 16:45:04 +08:00
parent c8a2a6b627
commit 52336accbd
40 changed files with 655 additions and 1952 deletions

View File

@@ -1,17 +1,19 @@
package com.zdjizhi.etl;
import cn.hutool.core.math.MathUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.DateUtils;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.*;
import com.zdjizhi.utils.*;
import com.zdjizhi.utils.connections.nacos.NacosUtils;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
@@ -24,12 +26,12 @@ import java.util.concurrent.TimeUnit;
/**
* @author wlh
*/
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, String>, DosEventLog> {
public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
private static final Log logger = LogFactory.get();
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
@@ -42,22 +44,20 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
private final static int OTHER_BASELINE_TYPE = 3;
@Override
public void open(Configuration parameters) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
super.open(parameters);
logger.info("begin init");
IpUtils.loadIpLook();
logger.info("init over");
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
FlowWriteConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0,
CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
FlowWriteConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("定时器任务执行失败", e);
}
@@ -65,7 +65,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
}
@Override
public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) {
public void processElement(DosSketchLog value, Context ctx, Collector<DosEventLog> out) throws Exception {
DosEventLog finalResult = null;
try {
String destinationIp = value.getDestination_ip();
@@ -75,13 +75,13 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
DosDetectionThreshold threshold = null;
if (thresholdRangeMap.containsKey(vsysId)){
if (thresholdRangeMap.containsKey(vsysId)) {
threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
}
logger.debug("当前判断IP{}, 类型: {}", key, attackType);
if (threshold == null && baselineMap.containsKey(key)) {
finalResult = getDosEventLogByBaseline(value,key);
finalResult = getDosEventLogByBaseline(value, key);
} else if (threshold == null && !baselineMap.containsKey(key)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
} else if (threshold != null) {
@@ -94,71 +94,62 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
logger.error("判定失败\n {} \n{}", value, e);
}
if (finalResult != null){
if (finalResult != null) {
out.collect(finalResult);
}
}
@Override
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<DosEventLog> out) throws Exception {
if (!value.isEmpty()){
IpUtils.updateIpLook(value);
}
}
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
long sketchSessions = value.getSketch_sessions();
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
long diff = sketchSessions - staticSensitivityThreshold;
return getDosEventLog(value, staticSensitivityThreshold, diff,0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) {
private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String key) {
String attackType = value.getAttack_type();
long sketchSessions = value.getSketch_sessions();
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
return getDosEventLog(value, base, diff, 0,BASELINE_CONDITION_TYPE, SESSIONS_TAG);
return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
long sessionBase = threshold.getSessionsPerSec();
long pktBase=threshold.getPacketsPerSec();
long bitBase=threshold.getBitsPerSec();
long sessionBase = threshold.getSessions_per_sec();
long pktBase = threshold.getPackets_per_sec();
long bitBase = threshold.getBits_per_sec();
long diffSession = value.getSketch_sessions() - sessionBase;
long diffPkt = value.getSketch_packets() - pktBase;
long diffByte = value.getSketch_bytes() - bitBase;
// Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
// Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
// Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
double diffSessionPercent=0.0;
double diffPktPercent=0.0;
double diffBitPercent=0.0;
if (sessionBase != 0 && sessionBase > 0){
diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
double diffSessionPercent = 0.0;
double diffPktPercent = 0.0;
double diffBitPercent = 0.0;
//todo 代码Review发现该部分存在bug23.11版本做修复,需测试。
if (sessionBase > 0) {
diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
}
else if (pktBase != 0 && pktBase > 0){
diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
if (pktBase > 0) {
diffPktPercent = getDiffPercent(diffPkt, pktBase) * 100;
}
else if (bitBase != 0 && bitBase > 0){
diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
if (bitBase > 0) {
diffBitPercent = getDiffPercent(diffByte, bitBase) * 100;
}
long profileId = 0;
DosEventLog result =null;
DosEventLog result = null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result= getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result = getDosEventLog(value, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = threshold.getProfileId();
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
} else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
} else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
}
return result;
@@ -175,15 +166,15 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
result = getResult(value, base, profileId, severity, percent, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
if (type == SENSITIVITY_CONDITION_TYPE) {
result.setSeverity(Severity.MAJOR.severity);
}
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, tag, result);
}
} else {
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
@@ -201,10 +192,10 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog));
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
dosEventLog.setDestination_country(IpLookupUtils.getCountryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
@@ -228,7 +219,7 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
base = defaultVaule;
}
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")){
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")) {
base = NacosUtils.getIntProperty("static.sensitivity.threshold");
}
}
@@ -239,24 +230,24 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
return base;
}
private String getConditions(String percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
private String getConditions(String percent, long base, long sessions, int type, String tag, DosEventLog dosEventLog) {
int condition = 0;
if ("Minor".equals(dosEventLog.getSeverity())) {
condition = 50;
} else if ("Warning".equals(dosEventLog.getSeverity())) {
condition = 100;
} else if ("Major".equals(dosEventLog.getSeverity())) {
condition = 250;
} else if ("Severe".equals(dosEventLog.getSeverity())) {
condition = 500;
} else if ("Critical".equals(dosEventLog.getSeverity())) {
condition = 800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return "Rate > " +
base + " " +
tag + "/s" + "(>"+condition+"%)";
tag + "/s" + "(>" + condition + "%)";
case BASELINE_CONDITION_TYPE:
return tag + " > " +
percent + " of baseline";
@@ -276,8 +267,8 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
String country = IpLookupUtils.getCountryLookup(ip);
if (StringUtil.isNotBlank(country)) {
countrySet.add(country);
}
}
@@ -304,19 +295,13 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<Stri
return index;
}
public static void main(String[] args) {
// System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," +
// "23.200.74.224,161.117.68.253"));
// DosDetection dosDetection = new DosDetection();
// System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000)));
}
private Double getDiffPercent(long diff, long base) {
try {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}catch (Exception e){
logger.info("当前阈值为0,进行下一阈值条件判断",e);
} catch (Exception e) {
logger.info("当前阈值为0,进行下一阈值条件判断", e);
return 0.0;
}