Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
859cd379e5 | ||
|
|
47ddef9bca | ||
|
|
0a6f36393c |
@@ -1,7 +1,6 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import com.zdjizhi.utils.CommonConfigurations;
|
||||
import com.zdjizhi.utils.NacosUtils;
|
||||
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
|
||||
|
||||
/**
|
||||
@@ -49,17 +48,7 @@ public class CommonConfig {
|
||||
|
||||
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
|
||||
|
||||
// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
|
||||
// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
|
||||
//
|
||||
// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
|
||||
// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
|
||||
// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
|
||||
// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
|
||||
// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
|
||||
|
||||
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
|
||||
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
|
||||
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
|
||||
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
|
||||
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.zdjizhi.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
@@ -16,44 +17,8 @@ public class DosDetectionThreshold implements Serializable {
|
||||
private long bitsPerSec;
|
||||
private long sessionsPerSec;
|
||||
private int isValid;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosDetectionThreshold{" +
|
||||
"profileId='" + profileId + '\'' +
|
||||
", attackType='" + attackType + '\'' +
|
||||
", serverIpList=" + serverIpList +
|
||||
", serverIpAddr='" + serverIpAddr + '\'' +
|
||||
", packetsPerSec=" + packetsPerSec +
|
||||
", bitsPerSec=" + bitsPerSec +
|
||||
", sessionsPerSec=" + sessionsPerSec +
|
||||
", isValid=" + isValid +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
DosDetectionThreshold threshold = (DosDetectionThreshold) o;
|
||||
return packetsPerSec == threshold.packetsPerSec &&
|
||||
bitsPerSec == threshold.bitsPerSec &&
|
||||
sessionsPerSec == threshold.sessionsPerSec &&
|
||||
isValid == threshold.isValid &&
|
||||
Objects.equals(profileId, threshold.profileId) &&
|
||||
Objects.equals(attackType, threshold.attackType) &&
|
||||
Objects.equals(serverIpList, threshold.serverIpList) &&
|
||||
Objects.equals(serverIpAddr, threshold.serverIpAddr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(profileId, attackType, serverIpList, serverIpAddr, packetsPerSec, bitsPerSec, sessionsPerSec, isValid);
|
||||
}
|
||||
private int vsysId;
|
||||
private Integer[] superiorIds;
|
||||
|
||||
public String getProfileId() {
|
||||
return profileId;
|
||||
@@ -118,4 +83,36 @@ public class DosDetectionThreshold implements Serializable {
|
||||
public void setIsValid(int isValid) {
|
||||
this.isValid = isValid;
|
||||
}
|
||||
|
||||
public int getVsysId() {
|
||||
return vsysId;
|
||||
}
|
||||
|
||||
public void setVsysId(int vsysId) {
|
||||
this.vsysId = vsysId;
|
||||
}
|
||||
|
||||
public Integer[] getSuperiorIds() {
|
||||
return superiorIds;
|
||||
}
|
||||
|
||||
public void setSuperiorIds(Integer[] superiorIds) {
|
||||
this.superiorIds = superiorIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosDetectionThreshold{" +
|
||||
"profileId='" + profileId + '\'' +
|
||||
", attackType='" + attackType + '\'' +
|
||||
", serverIpList=" + serverIpList +
|
||||
", serverIpAddr='" + serverIpAddr + '\'' +
|
||||
", packetsPerSec=" + packetsPerSec +
|
||||
", bitsPerSec=" + bitsPerSec +
|
||||
", sessionsPerSec=" + sessionsPerSec +
|
||||
", isValid=" + isValid +
|
||||
", vsysId=" + vsysId +
|
||||
", superiorIds=" + Arrays.toString(superiorIds) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DosEventLog implements Serializable {
|
||||
public class DosEventLog implements Serializable,Cloneable {
|
||||
|
||||
private long log_id;
|
||||
private int vsys_id;
|
||||
private long start_time;
|
||||
private long end_time;
|
||||
private String attack_type;
|
||||
@@ -27,6 +27,14 @@ public class DosEventLog implements Serializable {
|
||||
this.log_id = log_id;
|
||||
}
|
||||
|
||||
public int getVsys_id() {
|
||||
return vsys_id;
|
||||
}
|
||||
|
||||
public void setVsys_id(int vsys_id) {
|
||||
this.vsys_id = vsys_id;
|
||||
}
|
||||
|
||||
public long getStart_time() {
|
||||
return start_time;
|
||||
}
|
||||
@@ -125,8 +133,9 @@ public class DosEventLog implements Serializable {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "dosEventLog{" +
|
||||
return "DosEventLog{" +
|
||||
"log_id=" + log_id +
|
||||
", vsys_id=" + vsys_id +
|
||||
", start_time=" + start_time +
|
||||
", end_time=" + end_time +
|
||||
", attack_type='" + attack_type + '\'' +
|
||||
@@ -143,31 +152,7 @@ public class DosEventLog implements Serializable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof DosEventLog)) {
|
||||
return false;
|
||||
}
|
||||
DosEventLog that = (DosEventLog) o;
|
||||
return getLog_id() == that.getLog_id() &&
|
||||
getStart_time() == that.getStart_time() &&
|
||||
getEnd_time() == that.getEnd_time() &&
|
||||
getSession_rate() == that.getSession_rate() &&
|
||||
getPacket_rate() == that.getPacket_rate() &&
|
||||
getBit_rate() == that.getBit_rate() &&
|
||||
Objects.equals(getAttack_type(), that.getAttack_type()) &&
|
||||
Objects.equals(getSeverity(), that.getSeverity()) &&
|
||||
Objects.equals(getConditions(), that.getConditions()) &&
|
||||
Objects.equals(getDestination_ip(), that.getDestination_ip()) &&
|
||||
Objects.equals(getDestination_country(), that.getDestination_country()) &&
|
||||
Objects.equals(getSource_ip_list(), that.getSource_ip_list()) &&
|
||||
Objects.equals(getSource_country_list(), that.getSource_country_list());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getLog_id(), getStart_time(), getEnd_time(), getAttack_type(), getSeverity(), getConditions(), getDestination_ip(), getDestination_country(), getSource_ip_list(), getSource_country_list(), getSession_rate(), getPacket_rate(), getBit_rate());
|
||||
public Object clone() throws CloneNotSupportedException {
|
||||
return super.clone();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ public class DosMetricsLog implements Serializable {
|
||||
private long packet_rate;
|
||||
private long bit_rate;
|
||||
private int partition_num;
|
||||
private int vsys_id;
|
||||
|
||||
public int getPartition_num() {
|
||||
return partition_num;
|
||||
@@ -69,6 +70,14 @@ public class DosMetricsLog implements Serializable {
|
||||
this.bit_rate = bit_rate;
|
||||
}
|
||||
|
||||
public int getVsys_id() {
|
||||
return vsys_id;
|
||||
}
|
||||
|
||||
public void setVsys_id(int vsys_id) {
|
||||
this.vsys_id = vsys_id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosMetricsLog{" +
|
||||
@@ -79,29 +88,7 @@ public class DosMetricsLog implements Serializable {
|
||||
", packet_rate=" + packet_rate +
|
||||
", bit_rate=" + bit_rate +
|
||||
", partition_num=" + partition_num +
|
||||
", vsys_id=" + vsys_id +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof DosMetricsLog)) {
|
||||
return false;
|
||||
}
|
||||
DosMetricsLog that = (DosMetricsLog) o;
|
||||
return getSketch_start_time() == that.getSketch_start_time() &&
|
||||
getSession_rate() == that.getSession_rate() &&
|
||||
getPacket_rate() == that.getPacket_rate() &&
|
||||
getBit_rate() == that.getBit_rate() &&
|
||||
getPartition_num() == that.getPartition_num() &&
|
||||
Objects.equals(getAttack_type(), that.getAttack_type()) &&
|
||||
Objects.equals(getDestination_ip(), that.getDestination_ip());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ public class DosSketchLog implements Serializable {
|
||||
private long sketch_sessions;
|
||||
private long sketch_packets;
|
||||
private long sketch_bytes;
|
||||
private int vsys_id;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@@ -29,35 +30,10 @@ public class DosSketchLog implements Serializable {
|
||||
", sketch_sessions=" + sketch_sessions +
|
||||
", sketch_packets=" + sketch_packets +
|
||||
", sketch_bytes=" + sketch_bytes +
|
||||
", vsys_id=" + vsys_id +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof DosSketchLog)) {
|
||||
return false;
|
||||
}
|
||||
DosSketchLog sketchLog = (DosSketchLog) o;
|
||||
return getSketch_start_time() == sketchLog.getSketch_start_time() &&
|
||||
getSketch_duration() == sketchLog.getSketch_duration() &&
|
||||
getSketch_sessions() == sketchLog.getSketch_sessions() &&
|
||||
getSketch_packets() == sketchLog.getSketch_packets() &&
|
||||
getSketch_bytes() == sketchLog.getSketch_bytes() &&
|
||||
Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) &&
|
||||
Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) &&
|
||||
Objects.equals(getAttack_type(), sketchLog.getAttack_type()) &&
|
||||
Objects.equals(getSource_ip(), sketchLog.getSource_ip()) &&
|
||||
Objects.equals(getDestination_ip(), sketchLog.getDestination_ip());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes());
|
||||
}
|
||||
|
||||
public String getCommon_sled_ip() {
|
||||
return common_sled_ip;
|
||||
}
|
||||
@@ -137,4 +113,12 @@ public class DosSketchLog implements Serializable {
|
||||
public void setSketch_bytes(long sketch_bytes) {
|
||||
this.sketch_bytes = sketch_bytes;
|
||||
}
|
||||
|
||||
public int getVsys_id() {
|
||||
return vsys_id;
|
||||
}
|
||||
|
||||
public void setVsys_id(int vsys_id) {
|
||||
this.vsys_id = vsys_id;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,32 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class DosVsysId {
|
||||
private int vsysId;
|
||||
private Integer id;
|
||||
private Integer[] superiorIds;
|
||||
|
||||
public int getVsysId() {
|
||||
return vsysId;
|
||||
public Integer getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setVsysId(int vsysId) {
|
||||
this.vsysId = vsysId;
|
||||
public void setId(Integer id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Integer[] getSuperiorIds() {
|
||||
return superiorIds;
|
||||
}
|
||||
|
||||
public void setSuperiorIds(Integer[] superiorIds) {
|
||||
this.superiorIds = superiorIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosVsysId{" +
|
||||
"vsysId=" + vsysId +
|
||||
"id=" + id +
|
||||
", superiorIds=" + Arrays.toString(superiorIds) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,10 @@ import inet.ipaddr.IPAddressString;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.text.StrBuilder;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -23,12 +24,12 @@ import java.util.concurrent.TimeUnit;
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
|
||||
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
|
||||
private HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
|
||||
|
||||
private final static int BASELINE_SIZE = 144;
|
||||
private final static int STATIC_CONDITION_TYPE = 1;
|
||||
@@ -58,28 +59,40 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DosEventLog map(DosSketchLog value) {
|
||||
DosEventLog finalResult = null;
|
||||
public void processElement(DosSketchLog value, Context ctx, Collector<DosEventLog> out) {
|
||||
ArrayList<DosEventLog> finalResults = new ArrayList<>();
|
||||
try {
|
||||
String destinationIp = value.getDestination_ip();
|
||||
int vsysId = value.getVsys_id();
|
||||
String key = destinationIp + "-" + vsysId;
|
||||
String attackType = value.getAttack_type();
|
||||
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
|
||||
DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
|
||||
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
|
||||
if (threshold == null && baselineMap.containsKey(destinationIp)) {
|
||||
finalResult = getDosEventLogByBaseline(value);
|
||||
} else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
|
||||
finalResult = getDosEventLogBySensitivityThreshold(value);
|
||||
|
||||
DosDetectionThreshold threshold = null;
|
||||
if (thresholdRangeMap.containsKey(vsysId)){
|
||||
threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
|
||||
}
|
||||
|
||||
logger.debug("当前判断IP:{}, 类型: {}", key, attackType);
|
||||
if (threshold == null && baselineMap.containsKey(key)) {
|
||||
DosEventLog finalResult = getDosEventLogByBaseline(value,key);
|
||||
finalResults.add(finalResult);
|
||||
} else if (threshold == null && !baselineMap.containsKey(key)) {
|
||||
DosEventLog finalResult = getDosEventLogBySensitivityThreshold(value);
|
||||
finalResults.add(finalResult);
|
||||
} else if (threshold != null) {
|
||||
finalResult = getDosEventLogByStaticThreshold(value, threshold);
|
||||
finalResults = getDosEventLogByStaticThreshold(value, threshold);
|
||||
} else {
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
|
||||
logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("判定失败\n {} \n{}", value, e);
|
||||
}
|
||||
return finalResult;
|
||||
|
||||
for (DosEventLog dosEventLog:finalResults){
|
||||
out.collect(dosEventLog);
|
||||
}
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
|
||||
@@ -93,13 +106,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
return result;
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogByBaseline(DosSketchLog value) {
|
||||
private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) {
|
||||
DosEventLog result = null;
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
|
||||
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
|
||||
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
|
||||
Integer base = getBaseValue(dosBaselineThreshold, value);
|
||||
long diff = sketchSessions - base;
|
||||
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
|
||||
@@ -107,7 +119,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
return result;
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
|
||||
private ArrayList<DosEventLog> getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
|
||||
long base = threshold.getSessionsPerSec();
|
||||
long diff = value.getSketch_sessions() - base;
|
||||
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||
@@ -121,7 +133,18 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
ArrayList<DosEventLog> dosEventLogs = new ArrayList<>();
|
||||
dosEventLogs.add(result);
|
||||
Integer[] superiorIds = threshold.getSuperiorIds();
|
||||
if (superiorIds != null && superiorIds.length > 0){
|
||||
for (Integer integer:superiorIds){
|
||||
DosEventLog clone = (DosEventLog) result.clone();
|
||||
clone.setVsys_id(integer);
|
||||
clone.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLogs.add(clone);
|
||||
}
|
||||
}
|
||||
return dosEventLogs;
|
||||
}
|
||||
|
||||
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
|
||||
@@ -148,6 +171,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
dosEventLog.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLog.setVsys_id(value.getVsys_id());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.zdjizhi.etl;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.api.java.tuple.Tuple6;
|
||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
@@ -18,14 +18,14 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
|
||||
/**
|
||||
* @author 94976
|
||||
*/
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
|
||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String,String,Integer>, TimeWindow> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
|
||||
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
|
||||
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
|
||||
|
||||
@Override
|
||||
public void process(Tuple2<String, String> keys,
|
||||
public void process(Tuple3<String,String,Integer> keys,
|
||||
Context context, Iterable<DosSketchLog> elements,
|
||||
Collector<DosSketchLog> out) {
|
||||
DosSketchLog middleResult = getMiddleResult(keys, elements);
|
||||
@@ -40,7 +40,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
}
|
||||
}
|
||||
|
||||
private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
|
||||
private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){
|
||||
|
||||
DosSketchLog midResuleLog = new DosSketchLog();
|
||||
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
|
||||
@@ -48,6 +48,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
if (values != null){
|
||||
midResuleLog.setAttack_type(keys.f0);
|
||||
midResuleLog.setDestination_ip(keys.f1);
|
||||
midResuleLog.setVsys_id(keys.f2);
|
||||
midResuleLog.setSketch_start_time(values.f4);
|
||||
midResuleLog.setSketch_duration(values.f5);
|
||||
midResuleLog.setSource_ip(values.f3);
|
||||
|
||||
@@ -52,12 +52,14 @@ public class ParseSketchLog {
|
||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||
String attackType = sketchSource.get("attack_type").toString();
|
||||
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
||||
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
|
||||
for (HashMap<String, Object> obj : reportIpList) {
|
||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||
dosSketchLog.setSketch_duration(sketchDuration);
|
||||
dosSketchLog.setAttack_type(attackType);
|
||||
dosSketchLog.setVsys_id(vsysId);
|
||||
String sourceIp = obj.get("source_ip").toString();
|
||||
String destinationIp = obj.get("destination_ip").toString();
|
||||
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.zdjizhi.common.DosDetectionThreshold;
|
||||
import com.zdjizhi.common.DosVsysId;
|
||||
import com.zdjizhi.utils.HttpClientUtils;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.NacosUtils;
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
|
||||
@@ -112,8 +113,9 @@ public class ParseStaticThreshold {
|
||||
HashMap<String, Object> parms = new HashMap<>();
|
||||
parms.put("pageSize", -1);
|
||||
parms.put("orderBy", "vsysId desc");
|
||||
parms.put("type", 1);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
|
||||
String token = CommonConfig.BIFANG_SERVER_TOKEN;
|
||||
String token = NacosUtils.getStringProperty("bifang.server.token");
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
@@ -127,7 +129,7 @@ public class ParseStaticThreshold {
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
|
||||
logger.info("获取到vsysId{}条", vsysIdList.size());
|
||||
logger.info("获取到vsysId {}条", vsysIdList.size());
|
||||
} else {
|
||||
logger.warn("vsysIdList为空");
|
||||
}
|
||||
@@ -148,49 +150,54 @@ public class ParseStaticThreshold {
|
||||
* @return thresholds
|
||||
*/
|
||||
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
|
||||
ArrayList<DosDetectionThreshold> thresholds = null;
|
||||
// ArrayList<DosVsysId> vsysId = getVsysId();
|
||||
ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>();
|
||||
ArrayList<DosVsysId> vsysIds = getVsysId();
|
||||
try {
|
||||
// if (vsysId != null){
|
||||
// for (DosVsysId dosVsysId : vsysId) {
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, Object> parms = new HashMap<>();
|
||||
parms.put("pageSize", -1);
|
||||
parms.put("orderBy", "profileId asc");
|
||||
parms.put("isValid", 1);
|
||||
// parms.put("vsysId", dosVsysId.getVsysId());
|
||||
parms.put("vsysId", 1);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
|
||||
String token = CommonConfig.BIFANG_SERVER_TOKEN;
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
logger.info("获取到静态阈值配置{}条", thresholds.size());
|
||||
} else {
|
||||
logger.warn("静态阈值配置为空");
|
||||
}
|
||||
} else {
|
||||
logger.error(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
// }
|
||||
// }
|
||||
if (vsysIds != null) {
|
||||
for (DosVsysId dosVsysId : vsysIds) {
|
||||
Integer vsysId = dosVsysId.getId();
|
||||
Integer[] superiorIds = dosVsysId.getSuperiorIds();
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, Object> parms = new HashMap<>();
|
||||
parms.put("pageSize", -1);
|
||||
parms.put("orderBy", "profileId asc");
|
||||
parms.put("isValid", 1);
|
||||
parms.put("vsysId", vsysId);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
|
||||
String token = NacosUtils.getStringProperty("bifang.server.token");
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
for (DosDetectionThreshold dosDetectionThreshold:thresholds){
|
||||
dosDetectionThreshold.setSuperiorIds(superiorIds);
|
||||
vsysThresholds.add(dosDetectionThreshold);
|
||||
}
|
||||
logger.info("获取到vsys id是{}静态阈值配置{}条",vsysId, thresholds.size());
|
||||
} else {
|
||||
logger.warn("静态阈值配置为空");
|
||||
}
|
||||
} else {
|
||||
logger.error(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
|
||||
}
|
||||
|
||||
return thresholds;
|
||||
return vsysThresholds;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -198,14 +205,17 @@ public class ParseStaticThreshold {
|
||||
*
|
||||
* @return threshold RangeMap
|
||||
*/
|
||||
static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
|
||||
static HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() {
|
||||
HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4);
|
||||
try {
|
||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
|
||||
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
|
||||
String attackType = threshold.getAttackType();
|
||||
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
|
||||
int vsysId = threshold.getVsysId();
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
|
||||
|
||||
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
|
||||
ArrayList<String> serverIpList = threshold.getServerIpList();
|
||||
for (String sip : serverIpList) {
|
||||
IPAddressString ipAddressString = new IPAddressString(sip);
|
||||
@@ -238,7 +248,8 @@ public class ParseStaticThreshold {
|
||||
}
|
||||
}
|
||||
}
|
||||
thresholdRangeMap.put(attackType, treeRangeMap);
|
||||
rangeMap.put(attackType, treeRangeMap);
|
||||
thresholdRangeMap.put(vsysId,rangeMap);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -248,22 +259,27 @@ public class ParseStaticThreshold {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
/*
|
||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||
dosDetectionThreshold.forEach(System.out::println);
|
||||
|
||||
|
||||
// dosDetectionThreshold.forEach(System.out::println);
|
||||
getVsysId().forEach(System.out::println);
|
||||
System.out.println("------------------------");
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
|
||||
*/
|
||||
HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> staticThreshold = createStaticThreshold();
|
||||
|
||||
System.out.println("------------------------");
|
||||
|
||||
for (String type : staticThreshold.keySet()) {
|
||||
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
|
||||
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
|
||||
DosDetectionThreshold threshold = asMapOfRanges.get(range);
|
||||
System.out.println(type + "---" + range + "---" + threshold);
|
||||
for (Integer integer : staticThreshold.keySet()){
|
||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> stringTreeRangeMapHashMap = staticThreshold.get(integer);
|
||||
for (String type : stringTreeRangeMapHashMap.keySet()) {
|
||||
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = stringTreeRangeMapHashMap.get(type).asMapOfRanges();
|
||||
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
|
||||
DosDetectionThreshold threshold = asMapOfRanges.get(range);
|
||||
System.out.println(integer+"---"+type + "---" + range + "---" + threshold);
|
||||
}
|
||||
System.out.println("------------------------");
|
||||
}
|
||||
System.out.println("------------------------");
|
||||
|
||||
}
|
||||
// String s = loginBifangServer();
|
||||
// System.out.println(s);
|
||||
|
||||
@@ -19,6 +19,7 @@ class TrafficServerIpMetrics {
|
||||
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
|
||||
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
|
||||
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
|
||||
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
|
||||
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
|
||||
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
|
||||
return dosMetricsLog;
|
||||
|
||||
@@ -9,7 +9,7 @@ import com.zdjizhi.etl.EtlProcessFunction;
|
||||
import com.zdjizhi.etl.ParseSketchLog;
|
||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.streaming.api.datastream.*;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
@@ -37,7 +37,7 @@ public class OutputStreamSink {
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
||||
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
||||
return middleStream.process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
|
||||
@@ -48,12 +48,13 @@ public class OutputStreamSink {
|
||||
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
|
||||
}
|
||||
|
||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
|
||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{
|
||||
@Override
|
||||
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple2.of(
|
||||
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple3.of(
|
||||
dosSketchLog.getAttack_type(),
|
||||
dosSketchLog.getDestination_ip());
|
||||
dosSketchLog.getDestination_ip(),
|
||||
dosSketchLog.getVsys_id());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
|
||||
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#读取kafka group id
|
||||
kafka.input.group.id=2112080949
|
||||
kafka.input.group.id=dos-detection-job-220816-1
|
||||
#kafka.input.group.id=dos-detection-job-210813-1
|
||||
|
||||
#发送kafka metrics并行度大小
|
||||
@@ -51,7 +51,7 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines
|
||||
hbase.baseline.total.num=1000000
|
||||
|
||||
#baseline ttl,单位:天
|
||||
hbase.baseline.ttl=30
|
||||
hbase.baseline.ttl=1
|
||||
|
||||
#设置聚合并行度,2个key
|
||||
flink.first.agg.parallelism=1
|
||||
@@ -78,25 +78,9 @@ ip.mmdb.path=D:\\data\\dat\\
|
||||
#ip.mmdb.path=/home/bigdata/topology/dat/
|
||||
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
|
||||
|
||||
##静态敏感阈值,速率小于此值不报警
|
||||
#static.sensitivity.threshold=500
|
||||
#
|
||||
##基线敏感阈值
|
||||
#baseline.sensitivity.threshold=0.2
|
||||
#
|
||||
##基于baseline判定dos攻击的上下限
|
||||
#baseline.sessions.minor.threshold=0.5
|
||||
#baseline.sessions.warning.threshold=1
|
||||
#baseline.sessions.major.threshold=2.5
|
||||
#baseline.sessions.severe.threshold=5
|
||||
#baseline.sessions.critical.threshold=8
|
||||
|
||||
#bifang服务访问地址
|
||||
bifang.server.uri=http://192.168.44.72:80
|
||||
#bifang.server.uri=http://192.168.44.3:80
|
||||
|
||||
#访问bifang只读权限token,bifang内置,无需修改
|
||||
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
|
||||
#bifang.server.uri=http://192.168.44.72:80
|
||||
bifang.server.uri=http://192.168.44.3:80
|
||||
|
||||
#加密密码路径信息
|
||||
bifang.server.encryptpwd.path=/v1/user/encryptpwd
|
||||
@@ -135,15 +119,14 @@ baseline.threshold.schedule.days=1
|
||||
#kafka用户认证配置参数
|
||||
sasl.jaas.config.user=admin
|
||||
#sasl.jaas.config.password=galaxy2019
|
||||
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
|
||||
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
||||
|
||||
#是否开启kafka用户认证配置,1:是;0:否
|
||||
sasl.jaas.config.flag=1
|
||||
|
||||
#nacos配置
|
||||
nacos.server.addr=192.168.40.42:8848
|
||||
nacos.namespace=prod
|
||||
nacos.server.addr=192.168.44.12:8848
|
||||
nacos.namespace=test
|
||||
nacos.username=nacos
|
||||
nacos.password=nacos
|
||||
nacos.data.id=dos_detection.properties
|
||||
|
||||
@@ -41,7 +41,7 @@ public class NacosTest {
|
||||
|
||||
private void getProperties() {
|
||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
|
||||
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
|
||||
properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
|
||||
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
|
||||
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
|
||||
}
|
||||
@@ -56,7 +56,7 @@ public class NacosTest {
|
||||
Properties nacosConfigMap = new Properties();
|
||||
nacosConfigMap.load(new StringReader(content));
|
||||
System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
|
||||
} catch (NacosException | IOException e) {
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ public class NacosTest {
|
||||
System.out.println(configMsg);
|
||||
}
|
||||
});
|
||||
} catch (NacosException e) {
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user