新增读取DoS Detection Profiles IP冲突检测机制

修复DoS event日志end_time大于当前时间bug
This commit is contained in:
wanglihui
2021-09-26 18:41:36 +08:00
parent 77bc6a844e
commit c44250bf73
4 changed files with 20 additions and 10 deletions

View File

@@ -138,7 +138,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent),base, value.getSketch_sessions(), tag));

View File

@@ -60,11 +60,11 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
int cnt = 1;
long sessions = 0;
long packets = 0 ;
long bytes = 0;
long startTime = 0;
long startTime = System.currentTimeMillis()/1000;
long endTime = System.currentTimeMillis()/1000;
long duration = 0;
HashSet<String> sourceIpSet = new HashSet<>();
try {
@@ -74,17 +74,16 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
sessions += newSketchLog.getSketch_sessions();
packets += newSketchLog.getSketch_packets();
bytes += newSketchLog.getSketch_bytes();
startTime = newSketchLog.getSketch_start_time();
duration = newSketchLog.getSketch_duration();
startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
}else {
if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
sourceIpSet.add(sourceIp);
}
}
cnt += 1;
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
}catch (Exception e){

View File

@@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -180,7 +179,19 @@ public class ParseStaticThreshold {
thresholdRangeMap.put(Range.closed(lower, upper), floodTypeThresholdMap);
}
}else {
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
Map.Entry<Range<IPAddress>, Map<String, DosDetectionThreshold>> entry = thresholdRangeMap.getEntry(address);
if (entry != null){
Range<IPAddress> entryKey = entry.getKey();
Map<String, DosDetectionThreshold> entryValue = entry.getValue();
entryValue.put(threshold.getAttackType(), threshold);
if (entryKey.lowerEndpoint() == entryKey.upperEndpoint()){
thresholdRangeMap.put(Range.closed(address, address), entryValue);
}else {
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
}
}else {
thresholdRangeMap.put(Range.closed(address, address), floodTypeThresholdMap);
}
}
}
}

View File

@@ -60,7 +60,7 @@ flink.detection.map.parallelism=1
flink.watermark.max.orderness=10
#计算窗口大小默认600s
flink.window.max.time=10
flink.window.max.time=600
#dos event结果中distinct source IP限制
source.ip.list.limit=10000