diff --git a/pom.xml b/pom.xml
index 89a711c..00dac4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,14 +6,14 @@
com.zdjizhi
flink-dos-detection
- 1.0
+ 2.0
flink-dos-detection
http://www.example.com
- 1.2.1
+ 2.0.2
1.13.1
2.1.1
2.7.1
@@ -56,8 +56,8 @@
maven-compiler-plugin
3.8.0
- 1.8
- 1.8
+ 11
+ 11
false
@@ -135,24 +135,12 @@
-
org.jasypt
jasypt
1.9.3
-
-
-
-
-
-
-
-
-
-
-
com.jayway.jsonpath
@@ -164,8 +152,16 @@
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
+ provided
+
+
+
org.apache.flink
@@ -203,13 +199,6 @@
-
-
-
-
-
-
-
org.apache.hbase
hbase-client
@@ -251,11 +240,6 @@
-
- org.apache.httpcomponents
- httpclient
- 4.5.6
-
cn.hutool
@@ -276,30 +260,7 @@
2.0.32
-
- com.alibaba.nacos
- nacos-client
- 1.2.0
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- com.google.guava
- guava
-
-
- commons-codec
- commons-codec
-
-
- slf4j-api
- org.slf4j
-
-
-
-
+
commons-codec
commons-codec
@@ -319,13 +280,15 @@
-
+
+
com.google.guava
guava
- 22.0
+ 11.0.2
+
org.projectlombok
lombok
@@ -342,6 +305,7 @@
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
+ provided
diff --git a/src/main/java/com/zdjizhi/common/DosEventLog.java b/src/main/java/com/zdjizhi/common/DosEventLog.java
index caa8ac0..d3ec100 100644
--- a/src/main/java/com/zdjizhi/common/DosEventLog.java
+++ b/src/main/java/com/zdjizhi/common/DosEventLog.java
@@ -19,7 +19,11 @@ public class DosEventLog implements Serializable, Cloneable {
private long session_rate;
private long packet_rate;
private long bit_rate;
+ private long sessions;
+ private long packets;
+ private long bytes;
+ private int rule_id;
public long getRecv_time() {
return recv_time;
}
@@ -148,6 +152,38 @@ public class DosEventLog implements Serializable, Cloneable {
this.bit_rate = bit_rate;
}
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getPackets() {
+ return packets;
+ }
+
+ public void setPackets(long packets) {
+ this.packets = packets;
+ }
+
+ public long getBytes() {
+ return bytes;
+ }
+
+ public void setBytes(long bytes) {
+ this.bytes = bytes;
+ }
+
+ public int getRule_id() {
+ return rule_id;
+ }
+
+ public void setRule_id(int rule_id) {
+ this.rule_id = rule_id;
+ }
+
@Override
public String toString() {
return "DosEventLog{" +
@@ -174,4 +210,5 @@ public class DosEventLog implements Serializable, Cloneable {
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
+
}
diff --git a/src/main/java/com/zdjizhi/common/DosSketchLog.java b/src/main/java/com/zdjizhi/common/DosSketchLog.java
index d4d42ec..24c65d2 100644
--- a/src/main/java/com/zdjizhi/common/DosSketchLog.java
+++ b/src/main/java/com/zdjizhi/common/DosSketchLog.java
@@ -1,80 +1,227 @@
package com.zdjizhi.common;
import java.io.Serializable;
+import java.util.Map;
import java.util.Objects;
public class DosSketchLog implements Serializable {
- private long common_recv_time;
- private String common_sled_ip;
- private String common_data_center;
- private long sketch_start_time;
- private long sketch_duration;
- private String attack_type;
- private String source_ip;
- private String destination_ip;
- private long sketch_sessions;
- private long sketch_packets;
- private long sketch_bytes;
+ private String name;
+ private long recv_time;
+ private long timestamp_ms;
+ private String device_id;
+ private String data_center;
+ private String device_group;
+ private String decoded_as;
+ private int rule_id;
+ private String client_country;
+ private String server_country;
+ private String client_ip;
+ private String server_ip;
+ private long sessions;
+ private long pkts;
+ private long bytes;
private int vsys_id;
-
+ private long start_timestamp_ms;
+ private long end_timestamp_ms;
+ private long duration;
+ private Map clientips_countrys;
+ private long session_rate;
+ private long packet_rate;
+ private long bit_rate;
+ private String attack_type;
@Override
public String toString() {
return "DosSketchLog{" +
- "common_recv_time=" + common_recv_time +
- ", common_sled_ip='" + common_sled_ip + '\'' +
- ", common_data_center='" + common_data_center + '\'' +
- ", sketch_start_time=" + sketch_start_time +
- ", sketch_duration=" + sketch_duration +
- ", attack_type='" + attack_type + '\'' +
- ", source_ip='" + source_ip + '\'' +
- ", destination_ip='" + destination_ip + '\'' +
- ", sketch_sessions=" + sketch_sessions +
- ", sketch_packets=" + sketch_packets +
- ", sketch_bytes=" + sketch_bytes +
+ "name=" + name +
+ ", timestamp_ms='" + timestamp_ms + '\'' +
+ ", device_id='" + device_id + '\'' +
+ ", data_center=" + data_center +
+ ", device_group=" + device_group +
+ ", decoded_as='" + decoded_as + '\'' +
+ ", client_country='" + client_country + '\'' +
+ ", server_country='" + server_country + '\'' +
+ ", client_ip=" + client_ip +
+ ", server_ip=" + server_ip +
", vsys_id=" + vsys_id +
'}';
}
- public long getCommon_recv_time() {
- return common_recv_time;
+ public long getRecv_time() {
+ return recv_time;
}
- public void setCommon_recv_time(long common_recv_time) {
- this.common_recv_time = common_recv_time;
+ public void setRecv_time(long recv_time) {
+ this.recv_time = recv_time;
}
- public String getCommon_sled_ip() {
- return common_sled_ip;
+ public String getName() {
+ return name;
}
- public void setCommon_sled_ip(String common_sled_ip) {
- this.common_sled_ip = common_sled_ip;
+ public void setName(String name) {
+ this.name = name;
}
- public String getCommon_data_center() {
- return common_data_center;
+ public long getTimestamp_ms() {
+ return timestamp_ms;
}
- public void setCommon_data_center(String common_data_center) {
- this.common_data_center = common_data_center;
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
}
- public long getSketch_start_time() {
- return sketch_start_time;
+ public String getDevice_id() {
+ return device_id;
}
- public void setSketch_start_time(long sketch_start_time) {
- this.sketch_start_time = sketch_start_time;
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
}
- public long getSketch_duration() {
- return sketch_duration;
+ public String getData_center() {
+ return data_center;
}
- public void setSketch_duration(long sketch_duration) {
- this.sketch_duration = sketch_duration;
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getDecoded_as() {
+ return decoded_as;
+ }
+
+ public void setDecoded_as(String decoded_as) {
+ this.decoded_as = decoded_as;
+ }
+
+ public String getClient_country() {
+ return client_country;
+ }
+
+ public void setClient_country(String client_country) {
+ this.client_country = client_country;
+ }
+
+ public String getServer_country() {
+ return server_country;
+ }
+
+ public void setServer_country(String server_country) {
+ this.server_country = server_country;
+ }
+
+ public String getClient_ip() {
+ return client_ip;
+ }
+
+ public void setClient_ip(String client_ip) {
+ this.client_ip = client_ip;
+ }
+
+ public String getServer_ip() {
+ return server_ip;
+ }
+
+ public void setServer_ip(String server_ip) {
+ this.server_ip = server_ip;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getPkts() {
+ return pkts;
+ }
+
+ public void setPkts(long pkts) {
+ this.pkts = pkts;
+ }
+
+ public long getBytes() {
+ return bytes;
+ }
+
+ public void setBytes(long bytes) {
+ this.bytes = bytes;
+ }
+
+ public int getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(int vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public long getStart_timestamp_ms() {
+ return start_timestamp_ms;
+ }
+
+ public void setStart_timestamp_ms(long start_timestamp_ms) {
+ this.start_timestamp_ms = start_timestamp_ms;
+ }
+
+ public long getEnd_timestamp_ms() {
+ return end_timestamp_ms;
+ }
+
+ public void setEnd_timestamp_ms(long end_timestamp_ms) {
+ this.end_timestamp_ms = end_timestamp_ms;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public Map getClientips_countrys() {
+ return clientips_countrys;
+ }
+
+ public void setClientips_countrys(Map clientips_countrys) {
+ this.clientips_countrys = clientips_countrys;
+ }
+
+ public long getSession_rate() {
+ return session_rate;
+ }
+
+ public void setSession_rate(long session_rate) {
+ this.session_rate = session_rate;
+ }
+
+ public long getPacket_rate() {
+ return packet_rate;
+ }
+
+ public void setPacket_rate(long packet_rate) {
+ this.packet_rate = packet_rate;
+ }
+
+ public long getBit_rate() {
+ return bit_rate;
+ }
+
+ public void setBit_rate(long bit_rate) {
+ this.bit_rate = bit_rate;
}
public String getAttack_type() {
@@ -85,51 +232,12 @@ public class DosSketchLog implements Serializable {
this.attack_type = attack_type;
}
- public String getSource_ip() {
- return source_ip;
+
+ public int getRule_id() {
+ return rule_id;
}
- public void setSource_ip(String source_ip) {
- this.source_ip = source_ip;
- }
-
- public String getDestination_ip() {
- return destination_ip;
- }
-
- public void setDestination_ip(String destination_ip) {
- this.destination_ip = destination_ip;
- }
-
- public long getSketch_sessions() {
- return sketch_sessions;
- }
-
- public void setSketch_sessions(long sketch_sessions) {
- this.sketch_sessions = sketch_sessions;
- }
-
- public long getSketch_packets() {
- return sketch_packets;
- }
-
- public void setSketch_packets(long sketch_packets) {
- this.sketch_packets = sketch_packets;
- }
-
- public long getSketch_bytes() {
- return sketch_bytes;
- }
-
- public void setSketch_bytes(long sketch_bytes) {
- this.sketch_bytes = sketch_bytes;
- }
-
- public int getVsys_id() {
- return vsys_id;
- }
-
- public void setVsys_id(int vsys_id) {
- this.vsys_id = vsys_id;
+ public void setRule_id(int rule_id) {
+ this.rule_id = rule_id;
}
}
diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java
deleted file mode 100644
index b5465f2..0000000
--- a/src/main/java/com/zdjizhi/common/DosVsysId.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.zdjizhi.common;
-
-import java.util.Arrays;
-
-public class DosVsysId {
- private Integer id;
- private Integer[] superior_ids;
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public Integer[] getSuperior_ids() {
- return superior_ids;
- }
-
- public void setSuperior_ids(Integer[] superior_ids) {
- this.superior_ids = superior_ids;
- }
-
- @Override
- public String toString() {
- return "DosVsysId{" +
- "id=" + id +
- ", superior_ids=" + Arrays.toString(superior_ids) +
- '}';
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/pojo/DosSketchMetricsLog.java b/src/main/java/com/zdjizhi/common/pojo/DosSketchMetricsLog.java
new file mode 100644
index 0000000..d5d6923
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/pojo/DosSketchMetricsLog.java
@@ -0,0 +1,42 @@
+package com.zdjizhi.common.pojo;
+
+import java.util.Map;
+
+public class DosSketchMetricsLog {
+ private String name;
+ private Map tags;
+ private Map fields;
+ private long timestamp_ms;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Map getTags() {
+ return tags;
+ }
+
+ public void setTags(Map tags) {
+ this.tags = tags;
+ }
+
+ public Map getFields() {
+ return fields;
+ }
+
+ public void setFields(Map fields) {
+ this.fields = fields;
+ }
+
+ public long getTimestamp_ms() {
+ return timestamp_ms;
+ }
+
+ public void setTimestamp_ms(long timestamp_ms) {
+ this.timestamp_ms = timestamp_ms;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java b/src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java
deleted file mode 100644
index f6562cc..0000000
--- a/src/main/java/com/zdjizhi/common/pojo/KnowlegeBaseMeta.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.zdjizhi.common.pojo;
-
-import java.io.Serializable;
-
-/**
- *
- */
-public class KnowlegeBaseMeta implements Serializable {
- private String kb_id;
- private String name;
- private String sha256;
- private String format;
- private String path;
-
- public KnowlegeBaseMeta(String kd_id, String name, String sha256, String format, String path) {
- this.kb_id = kd_id;
- this.name = name;
- this.sha256 = sha256;
- this.format = format;
- this.path = path;
- }
-
- public String getKb_id() {
- return kb_id;
- }
-
- public void setKb_id(String kb_id) {
- this.kb_id = kb_id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getSha256() {
- return sha256;
- }
-
- public void setSha256(String sha256) {
- this.sha256 = sha256;
- }
-
- public String getFormat() {
- return format;
- }
-
- public void setFormat(String format) {
- this.format = format;
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- @Override
- public String toString() {
- return "KnowlegeBaseMeta{" +
- "kb_id='" + kb_id + '\'' +
- ", name='" + name + '\'' +
- ", sha256='" + sha256 + '\'' +
- ", format='" + format + '\'' +
- ", path='" + path + '\'' +
- '}';
- }
-}
-
diff --git a/src/main/java/com/zdjizhi/conf/DosConfigs.java b/src/main/java/com/zdjizhi/conf/DosConfigs.java
index 9614ed3..232ac4a 100644
--- a/src/main/java/com/zdjizhi/conf/DosConfigs.java
+++ b/src/main/java/com/zdjizhi/conf/DosConfigs.java
@@ -52,10 +52,10 @@ public class DosConfigs {
.stringType()
.noDefaultValue();
- public static final ConfigOption BIFANG_SERVER_URI =
+/* public static final ConfigOption BIFANG_SERVER_URI =
ConfigOptions.key("bifang.server.uri")
.stringType()
- .noDefaultValue();
+ .noDefaultValue();*/
public static final ConfigOption KNOWLEDGE_BASE_URL =
ConfigOptions.key("knowledge.base.uri")
@@ -114,10 +114,10 @@ public class DosConfigs {
.defaultValue("/v1/knowledge_base");
- public static final ConfigOption STATIC_THRESHOLD_SCHEDULE_MINUTES =
+/* public static final ConfigOption STATIC_THRESHOLD_SCHEDULE_MINUTES =
ConfigOptions.key("static.threshold.schedule.minutes")
.intType()
- .defaultValue(10);
+ .defaultValue(10);*/
public static final ConfigOption BASELINE_THRESHOLD_SCHEDULE_DAYS =
@@ -161,7 +161,7 @@ public class DosConfigs {
.defaultValue(8.0);
- public static final ConfigOption BIFANG_SERVER_ENCRYPTPWD_PATH =
+ /*public static final ConfigOption BIFANG_SERVER_ENCRYPTPWD_PATH =
ConfigOptions.key("bifang.server.encryptpwd.path")
.stringType()
.defaultValue("/v1/user/encryptpwd");
@@ -184,7 +184,7 @@ public class DosConfigs {
public static final ConfigOption BIFANG_SERVER_LOGIN_PATH =
ConfigOptions.key("bifang.server.login.path")
.stringType()
- .defaultValue("/v1/user/login");
+ .defaultValue("/v1/user/login");*/
public static final ConfigOption HTTP_POOL_MAX_CONNECTION =
ConfigOptions.key("http.pool.max.connection")
diff --git a/src/main/java/com/zdjizhi/function/DosDetectionFunction.java b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java
index 6662873..91623a4 100644
--- a/src/main/java/com/zdjizhi/function/DosDetectionFunction.java
+++ b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java
@@ -3,18 +3,12 @@ package com.zdjizhi.function;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.DateUtils;
-import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.*;
import com.zdjizhi.utils.Snowflakeld.SnowflakeId;
import com.zdjizhi.utils.Threshold.ParseBaselineThreshold;
-import com.zdjizhi.utils.Threshold.ParseStaticThreshold;
-import com.zdjizhi.utils.connections.http.HttpClientService;
-import com.zdjizhi.utils.knowledgebase.IpLookupUtils;
-import inet.ipaddr.IPAddress;
-import inet.ipaddr.IPAddressString;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
@@ -31,7 +25,7 @@ public class DosDetectionFunction extends ProcessFunction> baselineMap = new HashMap<>();
private final NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
- private HashMap>> thresholdRangeMap;
+ // private HashMap>> thresholdRangeMap;
private final int BASELINE_SIZE = 144;
private final int STATIC_CONDITION_TYPE = 1;
private final int BASELINE_CONDITION_TYPE = 2;
@@ -42,51 +36,17 @@ public class DosDetectionFunction extends ProcessFunction out) throws Exception {
DosEventLog finalResult = null;
try {
- String destinationIp = value.getDestination_ip();
- int vsysId = value.getVsys_id();
- String key = destinationIp + "-" + vsysId;
- String attackType = value.getAttack_type();
- IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
-
- DosDetectionThreshold threshold = null;
- if (thresholdRangeMap.containsKey(vsysId)) {
- threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
+ if (value.getRule_id() == 0) {
+ String destinationIp = value.getServer_ip();
+ int vsysId = value.getVsys_id();
+ String key = destinationIp + "-" + vsysId;
+ String attackType = value.getAttack_type();
+ DosDetectionThreshold threshold = null;
+ logger.debug("当前判断IP:{}, 类型: {}", key, attackType);
+ if (threshold == null && baselineMap.containsKey(key)) {
+ finalResult = getDosEventLogByBaseline(value, key);
+ } else if (threshold == null && !baselineMap.containsKey(key)) {
+ finalResult = getDosEventLogBySensitivityThreshold(value);
+ }
+ else {
+ logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType);
+ }
+ }
+ else{
+ finalResult = getResult(value,0,0,Severity.MAJOR,0.0,0,"DoS Protection [12]");
+ }
+ } catch(Exception e){
+ logger.error("判定失败\n {} \n{}", value, e);
}
- logger.debug("当前判断IP:{}, 类型: {}", key, attackType);
- if (threshold == null && baselineMap.containsKey(key)) {
- finalResult = getDosEventLogByBaseline(value, key);
- } else if (threshold == null && !baselineMap.containsKey(key)) {
- finalResult = getDosEventLogBySensitivityThreshold(value);
- } else if (threshold != null) {
- finalResult = getDosEventLogByStaticThreshold(value, threshold);
- } else {
- logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", key, attackType);
- }
-
- } catch (Exception e) {
- logger.error("判定失败\n {} \n{}", value, e);
- }
if (finalResult != null) {
out.collect(finalResult);
}
@@ -142,7 +100,7 @@ public class DosDetectionFunction extends ProcessFunction 0) {
- diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
- }
- if (pktBase > 0) {
- diffPktPercent = getDiffPercent(diffPkt, pktBase) * 100;
- }
- if (bitBase > 0) {
- diffBitPercent = getDiffPercent(diffByte, bitBase) * 100;
- }
-
- long profileId = 0;
- DosEventLog result = null;
-
- if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
- profileId = threshold.getId();
- result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
- } else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
- profileId = threshold.getId();
- result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
- } else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
- profileId = threshold.getId();
- result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
- }
- return result;
- }
-
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
- String destinationIp = value.getDestination_ip();
+ String destinationIp = value.getServer_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
@@ -207,7 +126,7 @@ public class DosDetectionFunction extends ProcessFunction> iterator = value.getClientips_countrys().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ client_ips.append(entry.getKey());
+ client_countrys.append(entry.getValue());
+ if (iterator.hasNext()) {
+ client_ips.append(",");
+ client_countrys.append(",");
+ }
+ }
+ if(client_ips.length()>0){
+ dosEventLog.setSource_ip_list(client_ips.toString());
+ dosEventLog.setSource_country_list(client_countrys.toString());
+ }
+ dosEventLog.setSession_rate(value.getSession_rate());
+ dosEventLog.setPacket_rate(value.getPacket_rate());
+ dosEventLog.setBit_rate(value.getBit_rate());
+ dosEventLog.setBytes(value.getBytes());
+ dosEventLog.setSessions(value.getSessions());
+ dosEventLog.setPackets(value.getPkts());
return dosEventLog;
}
@@ -253,10 +195,10 @@ public class DosDetectionFunction extends ProcessFunction countrySet = new HashSet<>();
- for (String ip : ipArr) {
- String country = ipLookupUtils.getCountryLookup(ip);
- if (StringUtil.isNotBlank(country)) {
- countrySet.add(country);
- }
- }
- countryList = StringUtils.join(countrySet, ", ");
- return countryList;
- } catch (Exception e) {
- logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
- return StringUtil.EMPTY;
- }
- } else {
- throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
- }
- }
-
private int getCurrentTimeIndex(long sketchStartTime) {
int index = 0;
try {
diff --git a/src/main/java/com/zdjizhi/function/DosMetricsRichFunction.java b/src/main/java/com/zdjizhi/function/DosMetricsRichFunction.java
new file mode 100644
index 0000000..b2a5000
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/DosMetricsRichFunction.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.function;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.geedgenetworks.utils.StringUtil;
+import com.zdjizhi.common.DosMetricsLog;
+import com.zdjizhi.common.DosSketchLog;
+import com.zdjizhi.common.pojo.DosSketchMetricsLog;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.zdjizhi.conf.DosConfigs.DESTINATION_IP_PARTITION_NUM;
+import static com.zdjizhi.conf.DosConfigs.FLINK_WINDOW_MAX_TIME;
+
+public class DosMetricsRichFunction extends RichFlatMapFunction {
+ private static final Logger logger = LoggerFactory.getLogger(DosMetricsRichFunction.class);
+
+ private Configuration configuration;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ configuration = (Configuration) getRuntimeContext()
+ .getExecutionConfig().getGlobalJobParameters();
+ }
+
+ @Override
+ public void flatMap(DosSketchLog dosSketchLog, Collector out) throws Exception {
+ try {
+
+ if(dosSketchLog.getRule_id()==0) {
+ DosMetricsLog dosMetricsLog = new DosMetricsLog();
+ dosMetricsLog.setSketch_start_time(dosSketchLog.getStart_timestamp_ms() / 1000);
+ dosMetricsLog.setDestination_ip(dosSketchLog.getServer_ip());
+ dosMetricsLog.setAttack_type(dosSketchLog.getAttack_type());
+ dosMetricsLog.setSession_rate(dosSketchLog.getSession_rate());
+ dosMetricsLog.setPacket_rate(dosSketchLog.getPacket_rate());
+ dosMetricsLog.setBit_rate(dosSketchLog.getBit_rate());
+ dosMetricsLog.setVsys_id(dosSketchLog.getVsys_id());
+ dosMetricsLog.setPartition_num(getPartitionNumByIp(dosSketchLog.getServer_ip()));
+ String jsonString = JSON.toJSONString(dosMetricsLog);
+ logger.debug("metric 结果已加载:{}", jsonString);
+ out.collect(jsonString);
+ }
+ } catch (Exception e) {
+ logger.error("数据解析错误:", e);
+ }
+ }
+ private long timeFloor(long sketchStartTime) {
+ return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME);
+ }
+
+ private int getPartitionNumByIp(String destinationIp) {
+ return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/function/EtlProcessFunction.java b/src/main/java/com/zdjizhi/function/EtlProcessFunction.java
deleted file mode 100644
index 5cb5e4d..0000000
--- a/src/main/java/com/zdjizhi/function/EtlProcessFunction.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package com.zdjizhi.function;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.DosMetricsLog;
-import com.zdjizhi.common.DosSketchLog;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-import java.util.HashSet;
-
-import static com.zdjizhi.conf.DosConfigs.*;
-
-/**
- * @author 94976
- */
-public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> {
-
- private static final Log logger = LogFactory.get();
- private final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
- private final String EMPTY_SOURCE_IP_IPV6 = "::";
- public static OutputTag outputTag = new OutputTag("traffic server ip metrics") {
- };
- private Configuration configuration;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- configuration = (Configuration) getRuntimeContext()
- .getExecutionConfig().getGlobalJobParameters();
- }
-
- @Override
- public void process(Tuple3 keys,
- Context context, Iterable elements,
- Collector out) {
- DosSketchLog middleResult = getMiddleResult(keys, elements);
- try {
- if (middleResult != null) {
- out.collect(middleResult);
- logger.debug("获取中间聚合结果:{}", middleResult.toString());
- context.output(outputTag, getOutputMetric(middleResult));
- }
- } catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
- }
- }
-
-
- private DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
- DosMetricsLog dosMetricsLog = new DosMetricsLog();
- dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis() / 1000));
- dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
- dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
- dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
- dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
- dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
- dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
- dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
- logger.debug("metric 结果已加载:{}", dosMetricsLog.toString());
- return dosMetricsLog;
- }
-
- private long timeFloor(long sketchStartTime) {
- return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME);
- }
-
- private int getPartitionNumByIp(String destinationIp) {
- return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
- }
-
- private DosSketchLog getMiddleResult(Tuple3 keys, Iterable elements) {
-
- DosSketchLog midResuleLog = new DosSketchLog();
- Tuple7 values = sketchAggregate(elements);
- try {
- if (values != null) {
- midResuleLog.setAttack_type(keys.f0);
- midResuleLog.setDestination_ip(keys.f1);
- midResuleLog.setVsys_id(keys.f2);
- midResuleLog.setSketch_start_time(values.f4);
- midResuleLog.setSketch_duration(values.f5);
- midResuleLog.setSource_ip(values.f3);
- midResuleLog.setSketch_sessions(values.f0);
- midResuleLog.setSketch_packets(values.f1);
- midResuleLog.setSketch_bytes(values.f2);
- midResuleLog.setCommon_recv_time(values.f6);
- return midResuleLog;
- }
- } catch (Exception e) {
- logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
- }
- return null;
- }
-
- private Tuple7 sketchAggregate(Iterable elements) {
- long sessions = 0;
- long packets = 0;
- long bytes = 0;
- long startTime = System.currentTimeMillis() / 1000;
- long endTime = System.currentTimeMillis() / 1000;
- long duration = 0;
- long recvtime = 0;
- HashSet sourceIpSet = new HashSet<>();
- try {
- for (DosSketchLog newSketchLog : elements) {
- if (recvtime == 0) {
- recvtime = newSketchLog.getCommon_recv_time();
- } else if (recvtime > newSketchLog.getCommon_recv_time()) {
- recvtime = newSketchLog.getCommon_recv_time();
- }
- String sourceIp = newSketchLog.getSource_ip();
- if (StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV6)) {
- sessions += newSketchLog.getSketch_sessions();
- packets += newSketchLog.getSketch_packets();
- bytes += newSketchLog.getSketch_bytes();
- startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
- endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
- duration = endTime - startTime == 0 ? 5 : endTime - startTime;
- } else {
- if (sourceIpSet.size() < configuration.get(SOURCE_IP_LIST_LIMIT)) {
- sourceIpSet.add(sourceIp);
- }
- }
- }
- String sourceIpList = StringUtils.join(sourceIpSet, ",");
- return Tuple7.of(sessions / configuration.get(FLINK_WINDOW_MAX_TIME), packets / configuration.get(FLINK_WINDOW_MAX_TIME),
- bytes * 8 / configuration.get(FLINK_WINDOW_MAX_TIME), sourceIpList, startTime, duration, recvtime);
- } catch (Exception e) {
- logger.error("聚合中间结果集失败 {}", e);
- }
- return null;
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/function/FlatSketchFunction.java b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java
index 4b3777b..5c1de1d 100644
--- a/src/main/java/com/zdjizhi/function/FlatSketchFunction.java
+++ b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java
@@ -3,6 +3,7 @@ package com.zdjizhi.function;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.DosSketchLog;
+import com.zdjizhi.common.pojo.DosSketchMetricsLog;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -10,44 +11,58 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Map;
public class FlatSketchFunction implements FlatMapFunction {
private static Logger logger = LoggerFactory.getLogger(FlatSketchFunction.class);
@Override
public void flatMap(String value, Collector out) {
+
try {
if (StringUtil.isNotBlank(value)) {
- final long recv_time = System.currentTimeMillis()/1000;
- HashMap sketchSource = JSONObject.parseObject(value, HashMap.class);
- long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
- long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
- String attackType = sketchSource.get("attack_type").toString();
- int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
- String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
- ArrayList> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
- for (HashMap obj : reportIpList) {
- DosSketchLog dosSketchLog = new DosSketchLog();
- dosSketchLog.setCommon_recv_time(recv_time);
- dosSketchLog.setSketch_start_time(sketchStartTime);
- dosSketchLog.setSketch_duration(sketchDuration);
- dosSketchLog.setAttack_type(attackType);
- dosSketchLog.setVsys_id(vsysId);
- String sourceIp = obj.get("source_ip").toString();
- String destinationIp = obj.get("destination_ip").toString();
- long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
- long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
- long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
- dosSketchLog.setSource_ip(sourceIp);
- dosSketchLog.setDestination_ip(destinationIp);
- dosSketchLog.setSketch_sessions(sketchSessions);
- dosSketchLog.setSketch_packets(sketchPackets);
- dosSketchLog.setSketch_bytes(sketchBytes);
- out.collect(dosSketchLog);
- logger.debug("数据解析成功:{}", dosSketchLog);
+ DosSketchLog dosSketchLog = new DosSketchLog();
+ dosSketchLog.setRecv_time(System.currentTimeMillis()/1000);
+ DosSketchMetricsLog dosSketchMetricsLog = JSONObject.parseObject(value, DosSketchMetricsLog.class);
+ dosSketchLog.setVsys_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("vsys_id", "1")));
+ dosSketchLog.setServer_ip(dosSketchMetricsLog.getTags().getOrDefault("server_ip", ""));
+ dosSketchLog.setDecoded_as(dosSketchMetricsLog.getTags().getOrDefault("decoded_as", ""));
+ dosSketchLog.setDuration(Long.parseLong(dosSketchMetricsLog.getTags().getOrDefault("duration","60000")));
+ dosSketchLog.setTimestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
+ dosSketchLog.setStart_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms());
+ dosSketchLog.setEnd_timestamp_ms(dosSketchMetricsLog.getTimestamp_ms() + dosSketchLog.getDuration());
+ dosSketchLog.setClient_ip(dosSketchMetricsLog.getTags().getOrDefault("client_ip", ""));
+ dosSketchLog.setData_center(dosSketchMetricsLog.getTags().getOrDefault("data_center", ""));
+ dosSketchLog.setDevice_id(dosSketchMetricsLog.getTags().getOrDefault("device_id", ""));
+ dosSketchLog.setDevice_group(dosSketchMetricsLog.getTags().getOrDefault("device_group", ""));
+ dosSketchLog.setServer_country(dosSketchMetricsLog.getTags().getOrDefault("server_country", ""));
+ dosSketchLog.setClient_country(dosSketchMetricsLog.getTags().getOrDefault("client_country", ""));
+ dosSketchLog.setRule_id(Integer.parseInt(dosSketchMetricsLog.getTags().getOrDefault("rule_id", "0")));
+ dosSketchLog.setName(dosSketchMetricsLog.getTags().getOrDefault("name", ""));
+
+ Map clientips_countrys = new HashMap<>();
+ dosSketchLog.setClientips_countrys(clientips_countrys);
+ if("top_client_and_server_ip".equals(dosSketchMetricsLog.getName())){
+ dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
+ dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
+ dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
+ clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
}
+ else if("top_client_ip_and_server_ip".equals(dosSketchMetricsLog.getName())){
+ dosSketchLog.setPkts(0);
+ dosSketchLog.setBytes(0);
+ dosSketchLog.setSessions(0);
+ clientips_countrys.put(dosSketchLog.getClient_ip(),dosSketchLog.getClient_country());
+ }
+ else {
+ dosSketchLog.setPkts(dosSketchMetricsLog.getFields().getOrDefault("pkts",0L));
+ dosSketchLog.setBytes(dosSketchMetricsLog.getFields().getOrDefault("bytes",0L));
+ dosSketchLog.setSessions(dosSketchMetricsLog.getFields().getOrDefault("sessions",0L));
+ }
+ out.collect(dosSketchLog);
}
} catch (Exception e) {
logger.error("数据解析错误:{} \n{}", value, e);
}
+
}
}
diff --git a/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java b/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java
new file mode 100644
index 0000000..6148755
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/MetricsAggregationReduce.java
@@ -0,0 +1,25 @@
+package com.zdjizhi.function;
+
+import com.zdjizhi.common.DosSketchLog;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public class MetricsAggregationReduce implements ReduceFunction {
+
+ @Override
+ public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception {
+ value1.setPkts(value1.getPkts() + value2.getPkts());
+ value1.setBytes(value1.getBytes() + value2.getBytes());
+ value1.setSessions(value1.getSessions() + value2.getSessions());
+ if (value1.getRecv_time() > value2.getRecv_time()) {
+ value1.setRecv_time(value2.getRecv_time());
+ }
+ if (value1.getStart_timestamp_ms() > value2.getStart_timestamp_ms()) {
+ value1.setStart_timestamp_ms(value2.getStart_timestamp_ms());
+ }
+ if (value1.getEnd_timestamp_ms() < value2.getEnd_timestamp_ms()) {
+ value1.setEnd_timestamp_ms(value2.getEnd_timestamp_ms());
+ }
+ value1.getClientips_countrys().putAll((value2.getClientips_countrys()));
+ return value1;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/function/MetricsCalculate.java b/src/main/java/com/zdjizhi/function/MetricsCalculate.java
new file mode 100644
index 0000000..a78dce8
--- /dev/null
+++ b/src/main/java/com/zdjizhi/function/MetricsCalculate.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.function;
+
+import com.zdjizhi.common.DosSketchLog;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricsCalculate extends ProcessWindowFunction<
+ DosSketchLog, // 输入类型
+ DosSketchLog, // 输出类型
+ Tuple4, // 键类型
+ TimeWindow> { // 窗口类型
+ private final Map attackTypeMapping = new HashMap<>();
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ attackTypeMapping.put("TCP SYN","TCP SYN Flood");
+ attackTypeMapping.put("DNS","UDP Flood");
+ attackTypeMapping.put("ICMP","ICMP Flood");
+ attackTypeMapping.put("UDP","DNS Flood");
+ attackTypeMapping.put("NTP","NTP Flood");
+ attackTypeMapping.put("","Custom Network Attack");
+ }
+
+ @Override
+ public void process(Tuple4 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception {
+
+ for (DosSketchLog dosSketchLog: elements){
+ dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (dosSketchLog.getDuration()/1000) );
+ dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(dosSketchLog.getDuration()/1000));
+ dosSketchLog.setBit_rate(dosSketchLog.getBytes()/(dosSketchLog.getDuration()/1000));
+ dosSketchLog.setAttack_type(attackTypeMapping.getOrDefault(dosSketchLog.getDecoded_as(),""));
+ out.collect(dosSketchLog);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/function/SketchKeysSelector.java b/src/main/java/com/zdjizhi/function/SketchKeysSelector.java
index edc3f0a..5d3389b 100644
--- a/src/main/java/com/zdjizhi/function/SketchKeysSelector.java
+++ b/src/main/java/com/zdjizhi/function/SketchKeysSelector.java
@@ -2,14 +2,15 @@ package com.zdjizhi.function;
import com.zdjizhi.common.DosSketchLog;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
-public class SketchKeysSelector implements KeySelector> {
+public class SketchKeysSelector implements KeySelector> {
@Override
- public Tuple3 getKey(DosSketchLog dosSketchLog){
- return Tuple3.of(
- dosSketchLog.getAttack_type(),
- dosSketchLog.getDestination_ip(),
- dosSketchLog.getVsys_id());
+ public Tuple4 getKey(DosSketchLog dosSketchLog){
+ return Tuple4.of(
+ dosSketchLog.getDecoded_as(),
+ dosSketchLog.getServer_ip(),
+ dosSketchLog.getVsys_id(),
+ dosSketchLog.getRule_id());
}
}
diff --git a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java
index 152cb0f..01e54c5 100644
--- a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java
+++ b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java
@@ -2,16 +2,15 @@ package com.zdjizhi.main;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.DosEventLog;
+import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.conf.DosConfiguration;
-import com.zdjizhi.function.DosDetectionFunction;
-import com.zdjizhi.function.EtlProcessFunction;
-import com.zdjizhi.function.FlatSketchFunction;
-import com.zdjizhi.function.SketchKeysSelector;
+import com.zdjizhi.function.*;
import com.zdjizhi.utils.connections.kafka.KafkaConsumer;
import com.zdjizhi.utils.connections.kafka.KafkaProducer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -40,7 +39,6 @@ public class DosDetectionApplication {
"\nUsage: flink -c xxx xxx.jar app.properties.");
}
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
-
final Configuration config = tool.getConfiguration();
env.getConfig().setGlobalJobParameters(config);
@@ -51,34 +49,36 @@ public class DosDetectionApplication {
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX))).setParallelism(config.get(SOURCE_PARALLELISM));
//Watermark settings
- final WatermarkStrategy dosSketchLogWatermarkStrategy = WatermarkStrategy.
+ WatermarkStrategy dosSketchLogWatermarkStrategy = WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(config.get(FLINK_WATERMARK_MAX_ORDERNESS)))
- .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
+ .withTimestampAssigner((event, timestamp) -> event.getTimestamp_ms() );
//Data preprocessing
- final SingleOutputStreamOperator sketchSource = dosStreamSource.flatMap(new FlatSketchFunction())
+ SingleOutputStreamOperator sketchSource = dosStreamSource.flatMap(new FlatSketchFunction()).setParallelism(1)
.assignTimestampsAndWatermarks(dosSketchLogWatermarkStrategy);
- //windowed aggregation
- final SingleOutputStreamOperator middleStream = sketchSource.keyBy(new SketchKeysSelector())
- .window(TumblingEventTimeWindows.of(Time.seconds(config.get(FLINK_WINDOW_MAX_TIME)))).process(new EtlProcessFunction())
+
+ SingleOutputStreamOperator serverIpMetrics=sketchSource.keyBy(new SketchKeysSelector())
+ .window(TumblingEventTimeWindows.of(Time.seconds(config.get(FLINK_WINDOW_MAX_TIME)))).reduce(new MetricsAggregationReduce(), new MetricsCalculate())
.setParallelism(config.get(Flink_FIRST_AGG_PATALLELISM));
//dos detection
- final SingleOutputStreamOperator dosEventLogOutputStream = middleStream.process(new DosDetectionFunction())
+ SingleOutputStreamOperator dosEventLogOutputStream = serverIpMetrics.process(new DosDetectionFunction())
.setParallelism(config.get(FLINK_DETECTION_MAP_PARALLELISM));
+ SingleOutputStreamOperator dosMetricsLogOutputStream = serverIpMetrics.flatMap(new DosMetricsRichFunction())
+ .setParallelism(config.get(FLINK_DETECTION_MAP_PARALLELISM));
+
+
+ dosMetricsLogOutputStream.addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_METRIC_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)))
+ .setParallelism(config.get(KAFKA_SINK_METRIC_PARALLELISM));
+
//dos event output
dosEventLogOutputStream.filter(Objects::nonNull)
.map(JSONObject::toJSONString)
.addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_EVENT_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)))
.setParallelism(config.get(KAFKA_SINK_EVENT_PARALLELISM));
- //traffic server ip metrics output
- middleStream.getSideOutput(EtlProcessFunction.outputTag).map(JSONObject::toJSONString)
- .addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_METRIC_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)))
- .setParallelism(config.get(KAFKA_SINK_METRIC_PARALLELISM));
-
env.execute(config.get(JOB_NAME));
}
}
diff --git a/src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java
deleted file mode 100644
index 54b7169..0000000
--- a/src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java
+++ /dev/null
@@ -1,283 +0,0 @@
-package com.zdjizhi.utils.Threshold;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.zdjizhi.common.DosDetectionThreshold;
-import com.zdjizhi.common.DosVsysId;
-import com.zdjizhi.utils.connections.http.HttpClientService;
-import inet.ipaddr.IPAddress;
-import inet.ipaddr.IPAddressString;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
-import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.message.BasicHeader;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.zdjizhi.conf.DosConfigs.*;
-
-/**
- * @author wlh
- */
-public class ParseStaticThreshold {
- private static final Log logger = LogFactory.get();
- public Configuration configuration;
- private String encryptpwd;
- private HttpClientService httpClientService;
-
-
- public ParseStaticThreshold(Configuration configuration, HttpClientService httpClientService) {
- this.configuration = configuration;
- this.httpClientService = httpClientService;
- }
-
- /**
- * 获取加密密码
- */
- private String getEncryptpwd() {
- String psw = httpClientService.ERROR_MESSAGE;
- try {
- URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
- HashMap parms = new HashMap<>();
- parms.put("password", "admin");
- httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_ENCRYPTPWD_PATH), parms);
- String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT));
- if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
- HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
- boolean success = (boolean) resposeMap.get("success");
- String msg = resposeMap.get("msg").toString();
-
- if (success) {
- HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
- psw = data.get("encryptpwd").toString();
- } else {
- logger.error(msg);
- }
- }
- } catch (URISyntaxException e) {
- logger.error("构造URI异常", e);
- } catch (Exception e) {
- logger.error("获取encryptpwd失败", e);
- }
- return psw;
- }
-
-
- /**
- * 获取vsysId配置列表
- *
- * @return vsysIdList
- */
- private ArrayList getVsysId() {
- ArrayList vsysIdList = null;
- try {
- URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
- HashMap parms = new HashMap<>();
- parms.put("page_size", -1);
-// parms.put("orderBy", "vsysId desc");
- parms.put("type", 1);
- httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_VSYSID_PATH), parms);
- String token = configuration.get(BIFANG_SERVER_TOKEN);
- if (!httpClientService.ERROR_MESSAGE.equals(token)) {
- BasicHeader authorization = new BasicHeader("Authorization", token);
- BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
- String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1);
- if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
- HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
- boolean success = (boolean) resposeMap.get("success");
- String msg = resposeMap.get("msg").toString();
- if (success) {
- HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
- Object list = data.get("vsys_list");
- if (list != null) {
- List dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class);
- vsysIdList= (ArrayList)dosVsysIds;
- logger.info("获取到vsysId {}条", vsysIdList.size());
- } else {
- logger.warn("vsysIdList为空");
- }
- } else {
- logger.error(msg);
- }
- }
- }
- } catch (Exception e) {
- logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e);
- }
- return vsysIdList;
- }
-
- /**
- * 根据vsysId获取静态阈值配置列表
- * @return thresholds
- */
- private ArrayList getDosDetectionThreshold() {
- ArrayList vsysThresholds = new ArrayList<>();
- ArrayList vsysIds = getVsysId();
- try {
- if (vsysIds != null) {
- for (DosVsysId dosVsysId : vsysIds) {
- Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId();
- Integer[] superiorIds = dosVsysId.getSuperior_ids();
- URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
- HashMap parms = new HashMap<>();
- parms.put("page_size", -1);
-// parms.put("order_by", "profileId asc");
- parms.put("is_valid", 1);
- parms.put("vsys_id", vsysId);
- httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_THRESHOLD_PATH), parms);
- String token = configuration.get(BIFANG_SERVER_TOKEN);
- if (!httpClientService.ERROR_MESSAGE.equals(token)) {
- BasicHeader authorization = new BasicHeader("Authorization", token);
- BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
- String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1);
- if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
- HashMap resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
- boolean success = (boolean) resposeMap.get("success");
- String msg = resposeMap.get("msg").toString();
- if (success) {
- HashMap data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
- Object list = data.get("dos_detections");
- if (list != null) {
- List dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class);
- ArrayList thresholds = (ArrayList)dosDetectionThresholds;
- for (DosDetectionThreshold dosDetectionThreshold : thresholds) {
- dosDetectionThreshold.setSuperior_ids(superiorIds);
- vsysThresholds.add(dosDetectionThreshold);
- }
- logger.info("获取到vsys id是{}静态阈值配置{}条", vsysId, thresholds.size());
- } else {
- logger.warn("静态阈值配置为空");
- }
- } else {
- logger.error(msg);
- }
- }
- }
- }
- }
- } catch (Exception e) {
- logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
- }
- return vsysThresholds;
- }
-
- /**
- * 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息
- *
- * @return threshold RangeMap
- */
- public HashMap>> createStaticThreshold() {
- HashMap>> thresholdRangeMap = new HashMap<>(4);
- try {
- ArrayList dosDetectionThreshold = getDosDetectionThreshold();
- if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
- for (DosDetectionThreshold threshold : dosDetectionThreshold) {
-
- String attackType = threshold.getAttack_type();
- int vsysId = threshold.getVsys_id();
- HashMap> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
- TreeRangeMap treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
- ArrayList serverIpList = threshold.getServer_ip_list();
-
- for (String sip : serverIpList) {
- IPAddressString ipAddressString = new IPAddressString(sip);
- if (ipAddressString.isIPAddress()) {
- IPAddress address = ipAddressString.getAddress();
- if (address.isPrefixed()) {
- IPAddress lower = address.getLower();
- IPAddress upper = address.getUpper();
- if (!address.isMultiple()) {
- lower = address.adjustPrefixLength(address.getBitCount());
- upper = address.toMaxHost().withoutPrefixLength();
- }
- Map.Entry, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower);
- Map.Entry, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper);
- if (lowerEntry != null && upperEntry == null) {
- Range lowerEntryKey = lowerEntry.getKey();
- DosDetectionThreshold lowerEntryValue = lowerEntry.getValue();
- treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
- treeRangeMap.put(Range.closed(lower, upper), threshold);
- } else if (lowerEntry == null && upperEntry != null) {
- Range upperEntryKey = upperEntry.getKey();
- DosDetectionThreshold upperEntryValue = upperEntry.getValue();
- treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
- treeRangeMap.put(Range.closed(lower, upper), threshold);
- } else {
- treeRangeMap.put(Range.closed(lower, upper), threshold);
- }
- } else {
- treeRangeMap.put(Range.closed(address, address), threshold);
- }
- }
- }
- rangeMap.put(attackType, treeRangeMap);
- thresholdRangeMap.put(vsysId, rangeMap);
- }
- }
- } catch (Exception e) {
- logger.error("构建threshold RangeMap失败", e);
- }
- return thresholdRangeMap;
- }
-
-
-
- /**
- * 登录bifang服务,获取token
- *
- * @return token
- */
- private String loginBifangServer() {
- String token = httpClientService.ERROR_MESSAGE;
- try {
- final HashMap parmsMap = new HashMap<>();
- String urlString = configuration.get(BIFANG_SERVER_URI)+configuration.get(BIFANG_SERVER_LOGIN_PATH);
- parmsMap.put("username","admin");
- parmsMap.put("password",encryptpwd);
- parmsMap.put("auth_mode","");
- final String jsonInputString = JSON.toJSONString(parmsMap);
- final URL url = new URL(urlString);
- final HttpURLConnection connection = (HttpURLConnection)url.openConnection();
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestProperty("Accept", "application/json");
- connection.setDoOutput(true);
- OutputStream os = connection.getOutputStream();
- os.write(jsonInputString.getBytes());
- os.flush();
- os.close();
- int responseCode = connection.getResponseCode();
- if (responseCode == 200 ) {
- // 读取响应内容
- BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
- String line;
- StringBuilder response = new StringBuilder();
- while ((line = reader.readLine()) != null) {
- response.append(line);
- }
- reader.close();
- HashMap body = JSONObject.parseObject(String.valueOf(response), HashMap.class);
- final HashMap data = JSONObject.parseObject(String.valueOf( body.get("data")), HashMap.class);
- token = (String) data.get("token");
- }
- } catch (Exception e) {
- logger.error("登录失败,未获取到token ", e);
- }
-
- return token;
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java b/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java
deleted file mode 100644
index 413350f..0000000
--- a/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package com.zdjizhi.utils.connections.http;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-
-import com.geedgenetworks.utils.StringUtil;
-import com.zdjizhi.utils.exception.FlowWriteException;
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.http.*;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpRequestRetryHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.config.Registry;
-import org.apache.http.config.RegistryBuilder;
-import org.apache.http.conn.ConnectTimeoutException;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
-import org.apache.http.conn.socket.ConnectionSocketFactory;
-import org.apache.http.conn.socket.PlainConnectionSocketFactory;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.message.BasicHeaderElementIterator;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.util.EntityUtils;
-
-import javax.net.ssl.*;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.X509Certificate;
-import java.util.Map;
-
-import static com.zdjizhi.conf.DosConfigs.*;
-
-public class HttpClientService {
- private static final Log logger = LogFactory.get();
- public static final String ERROR_MESSAGE = "-1";
- private Configuration configuration;
-
- public HttpClientService(Configuration configuration) {
- this.configuration = configuration;
- }
-
- /**
- * 在调用SSL之前需要重写验证方法,取消检测SSL
- * 创建ConnectionManager,添加Connection配置信息
- *
- * @return HttpClient 支持https
- */
- private PoolingHttpClientConnectionManager getSslClientManager() {
- try {
- // 在调用SSL之前需要重写验证方法,取消检测SSL
- X509TrustManager trustManager = new X509TrustManager() {
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return null;
- }
-
- @Override
- public void checkClientTrusted(X509Certificate[] xcs, String str) {
- }
-
- @Override
- public void checkServerTrusted(X509Certificate[] xcs, String str) {
- }
- };
- SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
- ctx.init(null, new TrustManager[]{trustManager}, null);
- SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
- Registry socketFactoryRegistry = RegistryBuilder.create()
- .register("http", PlainConnectionSocketFactory.INSTANCE)
- .register("https", socketFactory).build();
- // 创建ConnectionManager,添加Connection配置信息
- PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
- // 设置最大连接数
- connManager.setMaxTotal(configuration.get(HTTP_POOL_MAX_CONNECTION));
- // 设置每个连接的路由数
- connManager.setDefaultMaxPerRoute(configuration.get(HTTP_POOL_MAX_PER_ROUTE));
- return connManager;
- } catch (KeyManagementException | NoSuchAlgorithmException e) {
- throw new FlowWriteException(e.getMessage());
- }
- }
-
- /**
- * 获取Http客户端连接对象
- *
- * @param socketTimeOut 响应超时时间
- * @return Http客户端连接对象
- */
- private CloseableHttpClient getHttpClient(int socketTimeOut) {
- // 创建Http请求配置参数
- RequestConfig requestConfig = RequestConfig.custom()
- // 获取连接超时时间
- .setConnectionRequestTimeout(configuration.get(HTTP_POOL_REQUEST_TIMEOUT))
- // 请求超时时间
- .setConnectTimeout(configuration.get(HTTP_POOL_CONNECT_TIMEOUT))
- // 响应超时时间
- .setSocketTimeout(socketTimeOut)
- .build();
-
- /**
- * 测出超时重试机制为了防止超时不生效而设置
- * 如果直接放回false,不重试
- * 这里会根据情况进行判断是否重试
- */
- HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
- if (executionCount >= 3) {// 如果已经重试了3次,就放弃
- return false;
- }
- if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
- return true;
- }
- if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
- return false;
- }
- if (exception instanceof SocketTimeoutException) {
- if (exception.getMessage().contains("Read timed out")) {
- return false;
- }
- }
- if (exception instanceof UnknownHostException) {// 目标服务器不可达
- return false;
- }
- if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
- return false;
- }
- if (exception instanceof SSLException) {// ssl握手异常
- return false;
- }
- if (exception instanceof InterruptedIOException) {// 超时
- return true;
- }
-
-
- HttpClientContext clientContext = HttpClientContext.adapt(context);
- HttpRequest request = clientContext.getRequest();
- // 如果请求是幂等的,就再次尝试
- if (!(request instanceof HttpEntityEnclosingRequest)) {
- return true;
- }
- return false;
- };
-
-
- ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
- HeaderElementIterator it = new BasicHeaderElementIterator
- (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
- while (it.hasNext()) {
- HeaderElement he = it.nextElement();
- String param = he.getName();
- String value = he.getValue();
- if (value != null && param.equalsIgnoreCase("timeout")) {
- return Long.parseLong(value) * 1000;
- }
- }
- return 60 * 1000;//如果没有约定,则默认定义时长为60s
- };
-
- // 创建httpClient
- return HttpClients.custom()
- // 把请求相关的超时信息设置到连接客户端
- .setDefaultRequestConfig(requestConfig)
- // 把请求重试设置到连接客户端
- .setRetryHandler(retry)
- .setKeepAliveStrategy(myStrategy)
- // 配置连接池管理对象
- .setConnectionManager(getSslClientManager())
- .build();
- }
-
- public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) {
- InputStream result = null;
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient(socketTimeout);
- // 创建GET请求对象
- HttpGet httpGet = new HttpGet(url);
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpGet.addHeader(h);
- }
- }
- CloseableHttpResponse response = null;
-
- try {
- // 执行请求
- response = httpClient.execute(httpGet);
- // 获取响应实体
- result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
- // 获取响应信息
- EntityUtils.consume(response.getEntity());
- } catch (ClientProtocolException e) {
- logger.error("current file: {},Protocol error:{}", url, e.getMessage());
-
- } catch (ParseException e) {
- logger.error("current file: {}, Parser error:{}", url, e.getMessage());
-
- } catch (IOException e) {
- logger.error("current file: {},IO error:{}", url, e.getMessage());
-
- } finally {
- if (null != response) {
- try {
- EntityUtils.consume(response.getEntity());
- response.close();
- } catch (IOException e) {
- logger.error("Release Connection error:{}", e.getMessage());
-
- }
- }
- return result;
- }
- }
-
-
- public byte[] httpGetByte(String url, int socketTimeout, Header... headers) {
- byte[] result = null;
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient(socketTimeout);
- // 创建GET请求对象
- HttpGet httpGet = new HttpGet(url);
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpGet.addHeader(h);
- }
- }
- CloseableHttpResponse response = null;
-
- try {
- // 执行请求
- response = httpClient.execute(httpGet);
- // 获取响应实体
- result = IOUtils.toByteArray(response.getEntity().getContent());
- // 获取响应信息
- EntityUtils.consume(response.getEntity());
- } catch (ClientProtocolException e) {
- logger.error("current file: {},Protocol error:{}", url, e.getMessage());
-
- } catch (ParseException e) {
- logger.error("current file: {}, Parser error:{}", url, e.getMessage());
-
- } catch (IOException e) {
- logger.error("current file: {},IO error:{}", url, e.getMessage());
-
- } finally {
- if (null != response) {
- try {
- EntityUtils.consume(response.getEntity());
- response.close();
- } catch (IOException e) {
- logger.error("Release Connection error:{}", e.getMessage());
-
- }
- }
- return result;
- }
- }
-
- /**
- * GET请求
- *
- * @param uri 请求地
- * @return message
- */
- public String httpGet(URI uri, int socketTimeout, Header... headers) {
- String msg = ERROR_MESSAGE;
-
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient(socketTimeout);
- CloseableHttpResponse response = null;
-
- try {
- logger.info("http get uri {}", uri);
- // 创建GET请求对象
- HttpGet httpGet = new HttpGet(uri);
-
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpGet.addHeader(h);
- logger.info("request header : {}", h);
- }
- }
- // 执行请求
- response = httpClient.execute(httpGet);
- int statusCode = response.getStatusLine().getStatusCode();
- // 获取响应实体
- HttpEntity entity = response.getEntity();
- // 获取响应信息
- msg = EntityUtils.toString(entity, "UTF-8");
-
- if (statusCode != HttpStatus.SC_OK) {
- logger.error("Http get content is :{}", msg);
- }
- } catch (ClientProtocolException e) {
- logger.error("协议错误: {}", e.getMessage());
- } catch (ParseException e) {
- logger.error("解析错误: {}", e.getMessage());
- } catch (IOException e) {
- logger.error("IO错误: {}", e.getMessage());
- } finally {
- if (null != response) {
- try {
- EntityUtils.consume(response.getEntity());
- response.close();
- } catch (IOException e) {
- logger.error("释放链接错误: {}", e.getMessage());
-
- }
- }
- }
-
- return msg;
- }
-
- public void setUrlWithParams(URIBuilder uriBuilder, String path, Map params) {
- try {
- uriBuilder.setPath(path);
- if (params != null && !params.isEmpty()) {
- for (Map.Entry kv : params.entrySet()) {
- uriBuilder.setParameter(kv.getKey(), kv.getValue().toString());
- }
- }
- } catch (Exception e) {
- logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params);
- }
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java
index 01ef035..bd7d006 100644
--- a/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java
@@ -3,14 +3,25 @@ package com.zdjizhi.utils.connections.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
public class KafkaConsumer {
- public static FlinkKafkaConsumer getKafkaConsumer(String topic, Properties Properties){
- final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), Properties);
- return kafkaConsumer;
+ public static FlinkKafkaConsumer getKafkaConsumer(String topics, Properties Properties){
+
+
+ List topicList = new ArrayList<>();
+ if (topics.contains(",")) {
+ String[] words = topics.split(",");
+ topicList.addAll(Arrays.asList(words));
+ } else {
+ topicList.add(topics);
+ }
+ return new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), Properties);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java b/src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java
deleted file mode 100644
index d219aa7..0000000
--- a/src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package com.zdjizhi.utils.knowledgebase;
-
-import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson2.*;
-import com.geedgenetworks.utils.IpLookupV2;
-import com.geedgenetworks.utils.StringUtil;
-import com.google.common.base.Joiner;
-import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
-import com.zdjizhi.utils.connections.http.HttpClientService;
-import org.apache.flink.configuration.Configuration;
-import org.apache.http.client.utils.URIBuilder;
-
-import java.io.ByteArrayInputStream;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.zdjizhi.conf.DosConfigs.*;
-
-
-/**
- * @author wangchengcheng
- * @version 2023/11/10 15:23
- */
-public class IpLookupUtils {
- private static final Log logger = LogFactory.get();
- private final String ipBuiltInName = "ip_builtin.mmdb";
- private final String ipUserDefinedName = "ip_user_defined.mmdb";
-
- private Configuration configuration;
- /**
- * ip定位库
- */
- private static IpLookupV2 ipLookup;
-
- /**
- * 定位库默认分隔符
- */
- private static final String LOCATION_SEPARATOR = ".";
-
- /**
- * 最大重试次数
- */
- private static final int TRY_TIMES = 5;
-
- /**
- * http connections
- */
- private final HttpClientService httpClientService;
-
- /**
- * 定位库元数据缓存
- */
- private static final HashMap knowledgeMetaCache = new HashMap<>(16);
-
- private static String currentSha256IpUserDefined = "";
-
- private static String currentSha256IpBuiltin = "";
-
- public IpLookupUtils(Configuration configuration, HttpClientService httpClientService) {
- this.configuration = configuration;
- this.httpClientService = httpClientService;
- }
-
- public void stuffKnowledgeMetaCache() {
- KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_BUILTIN_KD_ID));
-
- if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
- String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
- knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
- }
- final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_USER_DEFINED_KD_ID));
- if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
- String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
- knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta);
- }
- if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
- currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256();
- currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256();
- reloadIpLookup();
- logger.debug("定位库信息重新加载成功,当前ip_builtin.mmdb的Sha256编码为:" + currentSha256IpBuiltin, "ip_user_defined.mmdb的Sha256编码为" + currentSha256IpUserDefined);
- }
- }
-
- /**
- * 从HDFS下载文件更新IpLookup
- */
- private void reloadIpLookup() {
- IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
- for (String fileName : knowledgeMetaCache.keySet()) {
- int retryNum = 0;
- KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
- String metaSha256 = knowlegeBaseMeta.getSha256();
- while (retryNum < TRY_TIMES) {
- System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
- Long startTime = System.currentTimeMillis();
- byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), configuration.get(HTTP_SOCKET_TIMEOUT));
- if (httpGetByte != null && httpGetByte.length > 0) {
- String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
- if (metaSha256.equals(downloadFileSha256)) {
- ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
- switch (fileName) {
- case ipBuiltInName:
- builder.loadDataFile(inputStream);
- break;
- case ipUserDefinedName:
- builder.loadDataFilePrivate(inputStream);
- break;
- default:
- }
- System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
- retryNum = TRY_TIMES;
- } else {
- logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
- retryNum++;
- }
- } else {
- logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
- retryNum++;
- }
- }
- }
- ipLookup = builder.build();
- }
-
-
- /**
- * 根据配置组合生成知识库元数据过滤参数
- *
- * @return 过滤参数
- */
- private String getFilterParameter() {
- String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]";
- return expr;
- }
-
- private KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) {
- KnowlegeBaseMeta knowlegeBaseMeta = null;
- String knowledgeInfo = null;
- try {
- URIBuilder uriBuilder = new URIBuilder(configuration.get(KNOWLEDGE_BASE_URL));
- HashMap parms = new HashMap<>();
- parms.put("kb_id", kd_id);
- httpClientService.setUrlWithParams(uriBuilder, configuration.get(KNOWLEDGE_BASE_PATH), parms);
- knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), configuration.get(HTTP_SOCKET_TIMEOUT));
- if (knowledgeInfo.contains("200")) {
- final Map jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class);
- logger.debug("获取kd_id为[" + kd_id + "]的knowledge_base成功,响应信息为" + jsonObject);
- JSONPath jsonPath = JSONPath.of(getFilterParameter());
- String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString();
- if (StringUtil.isNotBlank(extract)) {
- JSONArray jsonArray = JSON.parseArray(extract);
- if (jsonArray.size() > 0) {
- for (int i = 0; i < jsonArray.size(); i++) {
- knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
- }
- }
- }
- } else {
- logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo);
- }
- } catch (URISyntaxException e) {
- logger.error("构造URI异常", e);
- } catch (Exception e) {
- logger.error("获取knowledge_base失败", e);
- }
- return knowlegeBaseMeta;
- }
-
- public String getCountryLookup(String ip) {
- if (ipLookup != null) {
- return ipLookup.countryLookup(ip);
- } else {
- return null;
- }
- }
-
-}
diff --git a/src/main/resources/detection_dos_attack.properties b/src/main/resources/detection_dos_attack.properties
index b73018d..d3d6e2c 100644
--- a/src/main/resources/detection_dos_attack.properties
+++ b/src/main/resources/detection_dos_attack.properties
@@ -1,6 +1,6 @@
-source.kafka.topic=DOS-SKETCH-RECORD
+source.kafka.topic=DOS-SKETCH-METRIC,DOS-PROTECTION-RULE-METRIC
source.kafka.props.bootstrap.servers=192.168.44.12:9094
-source.kafka.props.group.id=dos-detection-job-20240116
+source.kafka.props.group.id=dos-detection-job-20240402-t1
source.kafka.props.session.timeout.ms=60000
source.kafka.props.max.poll.records=5000
source.kafka.props.max.partition.fetch.bytes=31457280
@@ -10,7 +10,7 @@ source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.Plain
#kafka sink
kafka.sink.event.topic.name=DOS-EVENT
-kafka.sink.metric.topic=TRAFFIC-TOP-DESTINATION-IP-METRIC
+kafka.sink.metric.topic=DOS-SKETCH-TOP-SERVER-IP
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
sink.kafka.props.security.protocol=SASL_PLAINTEXT
sink.kafka.props.sasl.mechanism=PLAIN
@@ -33,7 +33,7 @@ hbase.zookeeper.quorum=192.168.44.12:2181
flink.watermark.max.orderness=30
#计算窗口大小,默认600s
-flink.window.max.time=60
+flink.window.max.time=600
#cm服务访问地址
bifang.server.uri=http://192.168.44.3
diff --git a/src/test/java/com/zdjizhi/common/NacosTest.java b/src/test/java/com/zdjizhi/common/NacosTest.java
deleted file mode 100644
index 00fe64b..0000000
--- a/src/test/java/com/zdjizhi/common/NacosTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.zdjizhi.common;
-
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2022/3/1016:58
- */
-public class NacosTest {
-
- /**
- *
- * com.alibaba.nacos
- * nacos-client
- * 1.2.0
- *
- */
-
- private static Properties properties = new Properties();
- /**
- * config data id = config name
- */
- private static final String DATA_ID = "dos_baseline.properties";
- /**
- * config group
- */
- private static final String GROUP = "Galaxy";
-
- private void getProperties() {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
- properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
- properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
- }
-
-
- @Test
- public void GetConfigurationTest() {
- try {
- getProperties();
- ConfigService configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig(DATA_ID, GROUP, 5000);
- Properties nacosConfigMap = new Properties();
- nacosConfigMap.load(new StringReader(content));
-// System.out.println(FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- @Test
- public void ListenerConfigurationTest() {
- getProperties();
- try {
- //first get config
- ConfigService configService = NacosFactory.createConfigService(properties);
- String config = configService.getConfig(DATA_ID, GROUP, 5000);
-// System.out.println(config);
-
- //start listenner
- configService.addListener(DATA_ID, GROUP, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String configMsg) {
- System.out.println(configMsg);
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- //keep running,change nacos config,print new config
- /*
- while (true) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- */
- }
-}
diff --git a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java
index f67bf5f..97b2384 100644
--- a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java
+++ b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java
@@ -44,21 +44,21 @@ public class DosDetectionTest {
DosSketchLog dosSketchLog = new DosSketchLog ();
- dosSketchLog.setSketch_sessions(68);
- dosSketchLog.setSketch_packets(68);
- dosSketchLog.setSketch_bytes(285820);//185.82
+ dosSketchLog.setSessions(68);
+ dosSketchLog.setPkts(68);
+ dosSketchLog.setBytes(285820);//185.82
dosSketchLog.setVsys_id(1);
dosSketchLog.setAttack_type("ICMP Flood");
- dosSketchLog.setSource_ip("45.170.244.25");
- dosSketchLog.setDestination_ip("24.152.57.56");
+ dosSketchLog.setServer_ip("45.170.244.25");
+ dosSketchLog.setClient_ip("24.152.57.56");
//静态阈值获取
long sessionBase = dosDetectionThreshold.getSessions_per_sec();
long pktBase=dosDetectionThreshold.getPackets_per_sec();
long bitBase=dosDetectionThreshold.getBits_per_sec();
//基于速率进行计算
- long diffSession = dosSketchLog.getSketch_sessions() - sessionBase;
- long diffPkt = dosSketchLog.getSketch_packets() - pktBase;
- long diffByte = dosSketchLog.getSketch_bytes() - bitBase;
+ long diffSession = dosSketchLog.getSessions() - sessionBase;
+ long diffPkt = dosSketchLog.getPkts() - pktBase;
+ long diffByte = dosSketchLog.getBytes() - bitBase;
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
@@ -85,7 +85,7 @@ public class DosDetectionTest {
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
- String destinationIp = value.getDestination_ip();
+ String destinationIp = value.getServer_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
@@ -94,7 +94,7 @@ public class DosDetectionTest {
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < 0.2) {
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
- }else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
+ }else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSessions() < staticSensitivityThreshold){
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
result = getResult(value, base, profileId, severity, percent+1, type, tag);
@@ -115,21 +115,21 @@ public class DosDetectionTest {
DosEventLog dosEventLog = new DosEventLog();
// dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
- dosEventLog.setStart_time(value.getSketch_start_time());
- dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
+ dosEventLog.setStart_time(value.getStart_timestamp_ms());
+ dosEventLog.setEnd_time(value.getStart_timestamp_ms() + value.getDuration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
- dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
- dosEventLog.setDestination_ip(value.getDestination_ip());
+ dosEventLog.setConditions(getConditions(percent, base, value.getSessions(), type, tag,dosEventLog));
+ dosEventLog.setDestination_ip(value.getServer_ip());
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
- String ipList = value.getSource_ip();
+ String ipList = value.getClient_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
- dosEventLog.setSession_rate(value.getSketch_sessions());
- dosEventLog.setPacket_rate(value.getSketch_packets());
- dosEventLog.setBit_rate(value.getSketch_bytes());
+ dosEventLog.setSession_rate(value.getSession_rate());
+ dosEventLog.setPacket_rate(value.getPacket_rate());
+ dosEventLog.setBit_rate(value.getBit_rate());
return dosEventLog;
}