更新DoS检测程序,新增读取baseline TTL配置。

修复DoS检测-Conditions阈值描述语言逻辑问题。
This commit is contained in:
wanglihui
2021-12-08 13:51:09 +08:00
parent c46a395d9b
commit 75bbdd2962
4 changed files with 49 additions and 29 deletions

View File

@@ -28,6 +28,7 @@ public class CommonConfig {
public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name");
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
public static final int HBASE_BASELINE_TTL = CommonConfigurations.getIntProperty("hbase.baseline.ttl");
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");

View File

@@ -35,6 +35,10 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
private final static int OTHER_BASELINE_TYPE = 3;
@Override
@@ -82,7 +86,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, 3, "sessions");
long diff = sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD;
result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
result.setSeverity(Severity.MAJOR.severity);
}
return result;
@@ -96,7 +101,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions");
long diff = sketchSessions - base;
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
return result;
}
@@ -104,15 +110,15 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions");
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, 1, "packets");
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, 1, "bits");
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
}
}
return result;
@@ -129,7 +135,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent, type, tag);
result = getResult(value, base, severity, percent+1, type, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
} else {
@@ -214,9 +220,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
countrySet.add(IpUtils.ipLookup.countryLookup(ip));
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ",");
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
@@ -240,13 +249,11 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
public static void main(String[] args) {
Date date = new Date(1631548860 * 1000L);
System.out.println(date);
Date p1D = DateUtils.getTimeFloor(date, "P1D");
System.out.println(p1D + " " + p1D.getTime() / 1000);
System.out.println(new DosDetection().getCurrentTimeIndex(1634659080));
System.out.println(new DosDetection().getConditions(PERCENT_INSTANCE.format(1.64862), 100, 100, 3, "packets"));
System.out.println(10 + 10 * 0.2);
// System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," +
// "23.200.74.224,161.117.68.253"));
// DosDetection dosDetection = new DosDetection();
// System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000)));
}
private Double getDiffPercent(long diff, long base) {

View File

@@ -2,6 +2,7 @@ package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosBaselineThreshold;
import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.HbaseUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -12,10 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
public class ParseBaselineThreshold {
@@ -45,7 +43,11 @@ public class ParseBaselineThreshold {
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
long currentTimeMillis = System.currentTimeMillis();
scan = new Scan()
.setAllowPartialResults(true)
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
.setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
@@ -81,6 +83,13 @@ public class ParseBaselineThreshold {
}
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
long p200D = DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime();
System.out.println(p200D);
System.out.println(currentTimeMillis);
System.out.println(currentTimeMillis - p200D);
Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {

View File

@@ -15,30 +15,30 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2109160928
kafka.input.group.id=2112080949
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
#发送kafka metrics topic名
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
#kafka.output.metric.topic.name=test
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
#发送kafka event topic名
kafka.output.event.topic.name=DOS-EVENT
#kafka.output.event.topic.name=storm-dos-test
#kafka.output.event.topic.name=DOS-EVENT
kafka.output.event.topic.name=storm-dos-test
#kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
#hbase.zookeeper.quorum=192.168.44.12:2181
hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
hbase.zookeeper.quorum=192.168.44.12:2181
#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间
hbase.client.operation.timeout=30000
@@ -50,6 +50,9 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines
#读取baseline限制
hbase.baseline.total.num=1000000
#baseline ttl单位
hbase.baseline.ttl=30
#设置聚合并行度2个key
flink.first.agg.parallelism=1
@@ -89,8 +92,8 @@ baseline.sessions.severe.threshold=5
baseline.sessions.critical.threshold=8
#bifang服务访问地址
bifang.server.uri=http://192.168.44.72:80
#bifang.server.uri=http://192.168.44.3:80
#bifang.server.uri=http://192.168.44.72:80
bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限tokenbifang内置无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867