34 Commits

Author SHA1 Message Date
wangchengcheng
a17666abff fix:Modify the method of obtaining DoS Detection task knowledge base(TSG-17971) 2023-12-25 10:51:02 +08:00
wangchengcheng
322bb1e4cb fix:Adapt to CM interface(TSG-18052) 2023-12-21 18:25:54 +08:00
wangchengcheng
bdfe5f73db fix:recv_time fill value error 2023-12-18 13:49:53 +08:00
wangchengcheng
91cb1ce5d2 OLAP DoS Detection重组日志结构适配。(TSG-17836) 2023-12-11 09:50:28 +08:00
wangchengcheng
52336accbd 1.适配IP定位库v4/v6合并后的加载逻辑(GAL-436)。
2.适配bifang23.11接口(TSG-17675)。
3.移除广播流。
4.修复静态阈值判断部分的BUG。
2023-11-13 16:45:04 +08:00
unknown
c8a2a6b627 TSG-16920 修复自定义静态阈值满足任意阈值条件均产生告警 2023-09-07 18:20:02 +08:00
unknown
24d70f690e TSG-15712 修正DoS基线阈值告警信息中告警严重程度与告警值不匹配问题 2023-06-27 17:31:56 +08:00
unknown
77e982b22f TSG-15286 静态阈值新增业务测试用例 2023-06-09 10:36:54 +08:00
unknown
b3a23686a0 GAL-352 zdjizhi 1.1.1更换为zdjizhi 1.1.3 2023-06-08 17:01:26 +08:00
unknown
b9a694ddb9 GAL-349 优化DoS检测程序知识库更新流程 2023-06-08 16:57:19 +08:00
unknown
6fb37324ff GAL-352 获取SketchLog及bifang静态阈值适配Fastjson2 2023-06-07 14:29:53 +08:00
unknown
315b638470 GAL-352 DoS检测适配Fastjson2序列化库 2023-06-06 17:53:06 +08:00
unknown
bd48417eb8 删除注释代码 2023-05-26 15:51:57 +08:00
unknown
72acc976e3 TSG-15219 修复静态阈值的condition处理逻辑,新增静态阈值单元测试类 2023-05-26 15:44:37 +08:00
unknown
6be3ea7f1e TSG-15219 优化DoS静态阈值下的检测逻辑 2023-05-24 14:36:29 +08:00
unknown
04ee45f77d TSG-15167 新增知识库文件校验功能 2023-05-23 10:38:15 +08:00
unknown
d8b0a7637b 新增命中静态阈值后填充Profile ID 2023-04-03 17:35:36 +08:00
unknown
b56a2ec31e GAL-306 修复DoS检测不能读取HDFS中IP定位库问题,支持yarn per-job运行模式 2023-03-27 17:15:27 +08:00
unknown
11747d9964 GAL-296 解决使用Yarn模式运行时的依赖冲突问题 2023-03-07 18:58:25 +08:00
wanglihui
ce15a27a1b TSG-13094 修复DoS Event日志出现MVsys id 2022-12-21 17:11:14 +08:00
wanglihui
01bbe562c9 Merge branch 'tsg-22.11' of git.mesalab.cn:bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-19 10:18:44 +08:00
wanglihui
f07651cf14 修改vsys id字段名为common_t_vsys_id 2022-12-19 10:18:17 +08:00
unknown
7c201a8a3f 新增Nacos Namespace配置,删除更新至HDFS时Flush操作 2022-12-16 16:52:33 +08:00
wanglihui
78435d54ea Merge branch 'knowledge' of https://git.mesalab.cn/bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-06 19:11:30 +08:00
wanglihui
76c9247bb9 Merge branch 'knowledge' of https://git.mesalab.cn/bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-06 19:10:28 +08:00
wanglihui
488b7c6644 修改部分日志输出 2022-12-06 17:13:09 +08:00
unknown
0662d265dd GAL-224 取消SSL检测,新增HDFS高可用设置 2022-11-28 16:42:56 +08:00
unknown
87fe11dc93 优化单机模式落地方式 2022-11-28 15:38:36 +08:00
unknown
9a2a5b3957 GAL-224 DoS检测支持知识库动态加载 2022-11-23 15:30:24 +08:00
fy
c58acdcfc9 Flink连接知识库实现方案初始准备 2022-11-14 14:34:00 +08:00
wanglihui
b409150532 修复提交yarn集群依赖冲突bug 2022-10-17 18:15:11 +08:00
wanglihui
7e6d5fcfc5 修复无法vsys id 2022-10-17 11:10:35 +08:00
wanglihui
859cd379e5 DoS 检测支持vsys id 2022-09-23 18:37:33 +08:00
wanglihui
47ddef9bca DoS 检测事件日志默认VSYS ID 为 1 2022-08-19 10:17:52 +08:00
41 changed files with 1665 additions and 788 deletions

135
pom.xml
View File

@@ -6,19 +6,26 @@
<groupId>com.zdjizhi</groupId>
<artifactId>flink-dos-detection</artifactId>
<version>1.0-SNAPSHOT</version>
<version>23.12</version>
<name>flink-dos-detection</name>
<url>http://www.example.com</url>
<properties>
<galaxy.tools.version>1.2.1</galaxy.tools.version>
<flink.version>1.13.1</flink.version>
<hive.version>2.1.1</hive.version>
<hadoop.version>2.7.1</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
<jsonpath.version>2.4.0</jsonpath.version>
</properties>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -96,8 +103,27 @@
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src\main</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<dependencies>
@@ -109,23 +135,28 @@
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
<!--<artifactId>slf4j-api</artifactId>-->
<!--<version>1.7.21</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--<version>1.7.21</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>${jsonpath.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- CLI dependencies -->
@@ -133,12 +164,13 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
@@ -152,15 +184,30 @@
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-hdfs</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
<!--<scope>provided</scope>-->
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -170,6 +217,14 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
@@ -207,24 +262,11 @@
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.8</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
@@ -244,6 +286,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
@@ -271,7 +317,32 @@
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks</groupId>
<artifactId>galaxy</artifactId>
<version>${galaxy.tools.version}</version>
</dependency>
</dependencies>
</project>
</project>

View File

@@ -2,120 +2,117 @@ package com.zdjizhi.common;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
/**
* @author wlh
*/
public class DosDetectionThreshold implements Serializable {
private String profileId;
private String attackType;
private ArrayList<String> serverIpList;
private String serverIpAddr;
private long packetsPerSec;
private long bitsPerSec;
private long sessionsPerSec;
private int isValid;
private long profile_id;
private String attack_type;
private ArrayList<String> server_ip_list;
private String server_ip_addr;
private long packets_per_sec;
private long bits_per_sec;
private long sessions_per_sec;
private int is_valid;
private int vsys_id;
private Integer[] superior_ids;
public long getProfile_id() {
return profile_id;
}
public void setProfile_id(long profile_id) {
this.profile_id = profile_id;
}
public String getAttack_type() {
return attack_type;
}
public void setAttack_type(String attack_type) {
this.attack_type = attack_type;
}
public ArrayList<String> getServer_ip_list() {
return server_ip_list;
}
public void setServer_ip_list(ArrayList<String> server_ip_list) {
this.server_ip_list = server_ip_list;
}
public String getServer_ip_addr() {
return server_ip_addr;
}
public void setServer_ip_addr(String server_ip_addr) {
this.server_ip_addr = server_ip_addr;
}
public long getPackets_per_sec() {
return packets_per_sec;
}
public void setPackets_per_sec(long packets_per_sec) {
this.packets_per_sec = packets_per_sec;
}
public long getBits_per_sec() {
return bits_per_sec;
}
public void setBits_per_sec(long bits_per_sec) {
this.bits_per_sec = bits_per_sec;
}
public long getSessions_per_sec() {
return sessions_per_sec;
}
public void setSessions_per_sec(long sessions_per_sec) {
this.sessions_per_sec = sessions_per_sec;
}
public int getIs_valid() {
return is_valid;
}
public void setIs_valid(int is_valid) {
this.is_valid = is_valid;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
public Integer[] getSuperior_ids() {
return superior_ids;
}
public void setSuperior_ids(Integer[] superior_ids) {
this.superior_ids = superior_ids;
}
@Override
public String toString() {
return "DosDetectionThreshold{" +
"profileId='" + profileId + '\'' +
", attackType='" + attackType + '\'' +
", serverIpList=" + serverIpList +
", serverIpAddr='" + serverIpAddr + '\'' +
", packetsPerSec=" + packetsPerSec +
", bitsPerSec=" + bitsPerSec +
", sessionsPerSec=" + sessionsPerSec +
", isValid=" + isValid +
"profile_id=" + profile_id +
", attack_type='" + attack_type + '\'' +
", server_ip_list=" + server_ip_list +
", server_ip_addr='" + server_ip_addr + '\'' +
", packets_per_sec=" + packets_per_sec +
", bits_per_sec=" + bits_per_sec +
", sessions_per_sec=" + sessions_per_sec +
", is_valid=" + is_valid +
", vsys_id=" + vsys_id +
", superior_ids=" + Arrays.toString(superior_ids) +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DosDetectionThreshold threshold = (DosDetectionThreshold) o;
return packetsPerSec == threshold.packetsPerSec &&
bitsPerSec == threshold.bitsPerSec &&
sessionsPerSec == threshold.sessionsPerSec &&
isValid == threshold.isValid &&
Objects.equals(profileId, threshold.profileId) &&
Objects.equals(attackType, threshold.attackType) &&
Objects.equals(serverIpList, threshold.serverIpList) &&
Objects.equals(serverIpAddr, threshold.serverIpAddr);
}
@Override
public int hashCode() {
return Objects.hash(profileId, attackType, serverIpList, serverIpAddr, packetsPerSec, bitsPerSec, sessionsPerSec, isValid);
}
public String getProfileId() {
return profileId;
}
public void setProfileId(String profileId) {
this.profileId = profileId;
}
public String getAttackType() {
return attackType;
}
public void setAttackType(String attackType) {
this.attackType = attackType;
}
public ArrayList<String> getServerIpList() {
return serverIpList;
}
public void setServerIpList(ArrayList<String> serverIpList) {
this.serverIpList = serverIpList;
}
public String getServerIpAddr() {
return serverIpAddr;
}
public void setServerIpAddr(String serverIpAddr) {
this.serverIpAddr = serverIpAddr;
}
public long getPacketsPerSec() {
return packetsPerSec;
}
public void setPacketsPerSec(long packetsPerSec) {
this.packetsPerSec = packetsPerSec;
}
public long getBitsPerSec() {
return bitsPerSec;
}
public void setBitsPerSec(long bitsPerSec) {
this.bitsPerSec = bitsPerSec;
}
public long getSessionsPerSec() {
return sessionsPerSec;
}
public void setSessionsPerSec(long sessionsPerSec) {
this.sessionsPerSec = sessionsPerSec;
}
public int getIsValid() {
return isValid;
}
public void setIsValid(int isValid) {
this.isValid = isValid;
}
}

View File

@@ -1,13 +1,14 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.Objects;
public class DosEventLog implements Serializable {
public class DosEventLog implements Serializable, Cloneable {
private long recv_time;
private long log_id;
private int vsys_id;
private long start_time;
private long end_time;
private long profile_id;
private String attack_type;
private String severity;
private String conditions;
@@ -19,6 +20,14 @@ public class DosEventLog implements Serializable {
private long packet_rate;
private long bit_rate;
public long getRecv_time() {
return recv_time;
}
public void setRecv_time(long recv_time) {
this.recv_time = recv_time;
}
public long getLog_id() {
return log_id;
}
@@ -27,6 +36,14 @@ public class DosEventLog implements Serializable {
this.log_id = log_id;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
public long getStart_time() {
return start_time;
}
@@ -43,6 +60,14 @@ public class DosEventLog implements Serializable {
this.end_time = end_time;
}
public long getProfile_id() {
return profile_id;
}
public void setProfile_id(long profile_id) {
this.profile_id = profile_id;
}
public String getAttack_type() {
return attack_type;
}
@@ -125,10 +150,13 @@ public class DosEventLog implements Serializable {
@Override
public String toString() {
return "dosEventLog{" +
"log_id=" + log_id +
return "DosEventLog{" +
"recv_time=" + recv_time +
", log_id=" + log_id +
", vsys_id=" + vsys_id +
", start_time=" + start_time +
", end_time=" + end_time +
", profile_id=" + profile_id +
", attack_type='" + attack_type + '\'' +
", severity='" + severity + '\'' +
", conditions='" + conditions + '\'' +
@@ -143,31 +171,7 @@ public class DosEventLog implements Serializable {
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosEventLog)) {
return false;
}
DosEventLog that = (DosEventLog) o;
return getLog_id() == that.getLog_id() &&
getStart_time() == that.getStart_time() &&
getEnd_time() == that.getEnd_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getSeverity(), that.getSeverity()) &&
Objects.equals(getConditions(), that.getConditions()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip()) &&
Objects.equals(getDestination_country(), that.getDestination_country()) &&
Objects.equals(getSource_ip_list(), that.getSource_ip_list()) &&
Objects.equals(getSource_country_list(), that.getSource_country_list());
}
@Override
public int hashCode() {
return Objects.hash(getLog_id(), getStart_time(), getEnd_time(), getAttack_type(), getSeverity(), getConditions(), getDestination_ip(), getDestination_country(), getSource_ip_list(), getSource_country_list(), getSession_rate(), getPacket_rate(), getBit_rate());
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}

View File

@@ -12,6 +12,7 @@ public class DosMetricsLog implements Serializable {
private long packet_rate;
private long bit_rate;
private int partition_num;
private int vsys_id;
public int getPartition_num() {
return partition_num;
@@ -69,6 +70,14 @@ public class DosMetricsLog implements Serializable {
this.bit_rate = bit_rate;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
@Override
public String toString() {
return "DosMetricsLog{" +
@@ -79,29 +88,7 @@ public class DosMetricsLog implements Serializable {
", packet_rate=" + packet_rate +
", bit_rate=" + bit_rate +
", partition_num=" + partition_num +
", vsys_id=" + vsys_id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosMetricsLog)) {
return false;
}
DosMetricsLog that = (DosMetricsLog) o;
return getSketch_start_time() == that.getSketch_start_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
getPartition_num() == that.getPartition_num() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip());
}
@Override
public int hashCode() {
return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num());
}
}

View File

@@ -5,6 +5,7 @@ import java.util.Objects;
public class DosSketchLog implements Serializable {
private long common_recv_time;
private String common_sled_ip;
private String common_data_center;
private long sketch_start_time;
@@ -15,11 +16,14 @@ public class DosSketchLog implements Serializable {
private long sketch_sessions;
private long sketch_packets;
private long sketch_bytes;
private int vsys_id;
@Override
public String toString() {
return "DosSketchLog{" +
"common_sled_ip='" + common_sled_ip + '\'' +
"common_recv_time=" + common_recv_time +
", common_sled_ip='" + common_sled_ip + '\'' +
", common_data_center='" + common_data_center + '\'' +
", sketch_start_time=" + sketch_start_time +
", sketch_duration=" + sketch_duration +
@@ -29,33 +33,16 @@ public class DosSketchLog implements Serializable {
", sketch_sessions=" + sketch_sessions +
", sketch_packets=" + sketch_packets +
", sketch_bytes=" + sketch_bytes +
", vsys_id=" + vsys_id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosSketchLog)) {
return false;
}
DosSketchLog sketchLog = (DosSketchLog) o;
return getSketch_start_time() == sketchLog.getSketch_start_time() &&
getSketch_duration() == sketchLog.getSketch_duration() &&
getSketch_sessions() == sketchLog.getSketch_sessions() &&
getSketch_packets() == sketchLog.getSketch_packets() &&
getSketch_bytes() == sketchLog.getSketch_bytes() &&
Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) &&
Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) &&
Objects.equals(getAttack_type(), sketchLog.getAttack_type()) &&
Objects.equals(getSource_ip(), sketchLog.getSource_ip()) &&
Objects.equals(getDestination_ip(), sketchLog.getDestination_ip());
public long getCommon_recv_time() {
return common_recv_time;
}
@Override
public int hashCode() {
return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes());
public void setCommon_recv_time(long common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getCommon_sled_ip() {
@@ -137,4 +124,12 @@ public class DosSketchLog implements Serializable {
public void setSketch_bytes(long sketch_bytes) {
this.sketch_bytes = sketch_bytes;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
}

View File

@@ -1,22 +1,32 @@
package com.zdjizhi.common;
import java.util.Objects;
import java.util.Arrays;
public class DosVsysId {
private int vsysId;
private Integer id;
private Integer[] superior_ids;
public int getVsysId() {
return vsysId;
public Integer getId() {
return id;
}
public void setVsysId(int vsysId) {
this.vsysId = vsysId;
public void setId(Integer id) {
this.id = id;
}
public Integer[] getSuperior_ids() {
return superior_ids;
}
public void setSuperior_ids(Integer[] superior_ids) {
this.superior_ids = superior_ids;
}
@Override
public String toString() {
return "DosVsysId{" +
"vsysId=" + vsysId +
"id=" + id +
", superior_ids=" + Arrays.toString(superior_ids) +
'}';
}
}

View File

@@ -1,14 +1,17 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.CommonConfigurations;
import com.zdjizhi.utils.NacosUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author wlh
* @date 2021/1/6
*/
public class CommonConfig {
public class FlowWriteConfig {
/**
* 定位库默认分隔符
*/
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
@@ -47,19 +50,8 @@ public class CommonConfig {
public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num");
public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num");
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
//
// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
@@ -80,17 +72,34 @@ public class CommonConfig {
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
public static void main(String[] args) {
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值
encryptor.setPassword("galaxy");
// "raw_password"进行加密S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
// String password = "galaxy2019";
String password = "nacos";
String encPwd = encryptor.encrypt(password);
System.out.println(encPwd);
// 再进行解密raw_password
String rawPwd = encryptor.decrypt(encPwd);
System.out.println(rawPwd);
}
public static final Integer HTTP_SOCKET_TIMEOUT = CommonConfigurations.getIntProperty("http.socket.timeout");
public static final Long KNOWLEDGE_EXECUTION_INTERVAL = CommonConfigurations.getLongProperty("knowledge.execution.interval");
public static final String KNOWLEDGE_BASE_URL = CommonConfigurations.getStringProperty("knowledge.base.uri");
public static final String KNOWLEDGE_BASE_PATH = CommonConfigurations.getStringProperty("knowledge.base.path");
public static final String IP_USER_DEFINED_KD_ID = CommonConfigurations.getStringProperty("ip.user.defined.kd.id");
public static final String IP_BUILTIN_KD_ID = CommonConfigurations.getStringProperty("ip.builtin.kd.id");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
public static final Integer STATIC_SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("static.sensitivity.threshold");
public static final Double BASELINE_SENSITIVITY_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sensitivity.threshold");
public static final Double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final Double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final Double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
public static final Double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
public static final Double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
}

View File

@@ -0,0 +1,74 @@
package com.zdjizhi.common.pojo;
import java.io.Serializable;
/**
*
*/
public class KnowlegeBaseMeta implements Serializable {
private String kb_id;
private String name;
private String sha256;
private String format;
private String path;
public KnowlegeBaseMeta(String kd_id, String name, String sha256, String format, String path) {
this.kb_id = kd_id;
this.name = name;
this.sha256 = sha256;
this.format = format;
this.path = path;
}
public String getKb_id() {
return kb_id;
}
public void setKb_id(String kb_id) {
this.kb_id = kb_id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSha256() {
return sha256;
}
public void setSha256(String sha256) {
this.sha256 = sha256;
}
public String getFormat() {
return format;
}
public void setFormat(String format) {
this.format = format;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@Override
public String toString() {
return "KnowlegeBaseMeta{" +
"kb_id='" + kb_id + '\'' +
", name='" + name + '\'' +
", sha256='" + sha256 + '\'' +
", format='" + format + '\'' +
", path='" + path + '\'' +
'}';
}
}

View File

@@ -1,17 +1,19 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.DateUtils;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.*;
import com.zdjizhi.utils.*;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.text.NumberFormat;
@@ -23,12 +25,12 @@ import java.util.concurrent.TimeUnit;
/**
* @author wlh
*/
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private static final Log logger = LogFactory.get();
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
@@ -41,16 +43,20 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private final static int OTHER_BASELINE_TYPE = 3;
@Override
public void open(Configuration parameters) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
super.open(parameters);
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
FlowWriteConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0,
CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
FlowWriteConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("定时器任务执行失败", e);
}
@@ -58,85 +64,116 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
@Override
public DosEventLog map(DosSketchLog value) {
public void processElement(DosSketchLog value, Context ctx, Collector<DosEventLog> out) throws Exception {
DosEventLog finalResult = null;
try {
String destinationIp = value.getDestination_ip();
int vsysId = value.getVsys_id();
String key = destinationIp + "-" + vsysId;
String attackType = value.getAttack_type();
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (threshold == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value);
} else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
DosDetectionThreshold threshold = null;
if (thresholdRangeMap.containsKey(vsysId)) {
threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
}
logger.debug("当前判断IP{}, 类型: {}", key, attackType);
if (threshold == null && baselineMap.containsKey(key)) {
finalResult = getDosEventLogByBaseline(value, key);
} else if (threshold == null && !baselineMap.containsKey(key)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
} else if (threshold != null) {
finalResult = getDosEventLogByStaticThreshold(value, threshold);
} else {
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", key, attackType);
}
} catch (Exception e) {
logger.error("判定失败\n {} \n{}", value, e);
}
return finalResult;
if (finalResult != null) {
out.collect(finalResult);
}
}
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
long diff = sketchSessions - NacosUtils.getIntProperty("static.sensitivity.threshold");
result = getDosEventLog(value, NacosUtils.getIntProperty("static.sensitivity.threshold"), diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
result.setSeverity(Severity.MAJOR.severity);
}
return result;
Integer staticSensitivityThreshold = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD;
long diff = sketchSessions - staticSensitivityThreshold;
return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByBaseline(DosSketchLog value) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String key) {
String attackType = value.getAttack_type();
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
return getDosEventLog(value, base, diff, 0, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
long sessionBase = threshold.getSessions_per_sec();
long pktBase = threshold.getPackets_per_sec();
long bitBase = threshold.getBits_per_sec();
long diffSession = value.getSketch_sessions() - sessionBase;
long diffPkt = value.getSketch_packets() - pktBase;
long diffByte = value.getSketch_bytes() - bitBase;
double diffSessionPercent = 0.0;
double diffPktPercent = 0.0;
double diffBitPercent = 0.0;
//todo 代码Review发现该部分存在bug23.11版本做修复,需测试。
if (sessionBase > 0) {
diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
}
if (pktBase > 0) {
diffPktPercent = getDiffPercent(diffPkt, pktBase) * 100;
}
if (bitBase > 0) {
diffBitPercent = getDiffPercent(diffByte, bitBase) * 100;
}
long profileId = 0;
DosEventLog result = null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
} else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
} else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
profileId = threshold.getProfile_id();
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
}
return result;
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
}
}
return result;
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
Integer staticSensitivityThreshold = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD;
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
if (type == BASELINE_CONDITION_TYPE && percent < FlowWriteConfig.BASELINE_SENSITIVITY_THRESHOLD) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent+1, type, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
result = getResult(value, base, profileId, severity, percent, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE) {
result.setSeverity(Severity.MAJOR.severity);
}
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp, attackType, base, percent, type, tag, result);
}
} else {
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
@@ -145,16 +182,20 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setRecv_time(value.getCommon_recv_time());
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog));
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
dosEventLog.setDestination_country(IpLookupUtils.getCountryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
@@ -178,8 +219,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
base = defaultVaule;
}
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")){
base = NacosUtils.getIntProperty("static.sensitivity.threshold");
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD) {
base = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD;
}
}
}
@@ -189,25 +230,31 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return base;
}
private String getConditions(String percent, long base, long sessions, int type, String tag) {
private String getConditions(String percent, long base, long sessions, int type, String tag, DosEventLog dosEventLog) {
int condition = 0;
if ("Minor".equals(dosEventLog.getSeverity())) {
condition = 50;
} else if ("Warning".equals(dosEventLog.getSeverity())) {
condition = 100;
} else if ("Major".equals(dosEventLog.getSeverity())) {
condition = 250;
} else if ("Severe".equals(dosEventLog.getSeverity())) {
condition = 500;
} else if ("Critical".equals(dosEventLog.getSeverity())) {
condition = 800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
.append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
return "Rate > " +
base + " " +
tag + "/s" + "(>" + condition + "%)";
case BASELINE_CONDITION_TYPE:
return new StrBuilder()
.append(tag).append(" > ")
.append(percent).append(" of baseline")
.toString();
return tag + " > " +
percent + " of baseline";
case SENSITIVITY_CONDITION_TYPE:
return new StrBuilder()
.append(sessions).append(" ")
.append(tag).append("/s Unusually high ")
.append(StringUtils.capitalize(tag))
.toString();
return String.valueOf(sessions) + " " +
tag + "/s Unusually high " +
StringUtils.capitalize(tag);
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
@@ -220,8 +267,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
String country = IpLookupUtils.getCountryLookup(ip);
if (StringUtil.isNotBlank(country)) {
countrySet.add(country);
}
}
@@ -248,28 +295,28 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return index;
}
public static void main(String[] args) {
// System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," +
// "23.200.74.224,161.117.68.253"));
// DosDetection dosDetection = new DosDetection();
// System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000)));
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
try {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception e) {
logger.info("当前阈值为0,进行下一阈值条件判断", e);
return 0.0;
}
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold")) {
if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_WARNING_THRESHOLD) {
return Severity.MINOR;
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.major.threshold")) {
} else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD) {
return Severity.WARNING;
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.major.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold")) {
} else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD) {
return Severity.MAJOR;
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
} else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
return Severity.SEVERE;
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
} else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
return Severity.CRITICAL;
} else {
return Severity.NORMAL;

View File

@@ -1,15 +1,15 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosSketchLog;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
@@ -18,14 +18,15 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
/**
* @author 94976
*/
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String,String,Integer>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
// private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
private static final Log logger = LogFactory.get();
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
@Override
public void process(Tuple2<String, String> keys,
public void process(Tuple3<String,String,Integer> keys,
Context context, Iterable<DosSketchLog> elements,
Collector<DosSketchLog> out) {
DosSketchLog middleResult = getMiddleResult(keys, elements);
@@ -40,20 +41,22 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){
DosSketchLog midResuleLog = new DosSketchLog();
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
Tuple7<Long, Long, Long,String,Long,Long,Long> values = sketchAggregate(elements);
try {
if (values != null){
midResuleLog.setAttack_type(keys.f0);
midResuleLog.setDestination_ip(keys.f1);
midResuleLog.setVsys_id(keys.f2);
midResuleLog.setSketch_start_time(values.f4);
midResuleLog.setSketch_duration(values.f5);
midResuleLog.setSource_ip(values.f3);
midResuleLog.setSketch_sessions(values.f0);
midResuleLog.setSketch_packets(values.f1);
midResuleLog.setSketch_bytes(values.f2);
midResuleLog.setCommon_recv_time(values.f6);
return midResuleLog;
}
} catch (Exception e){
@@ -62,16 +65,22 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
return null;
}
private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
private Tuple7<Long, Long, Long,String,Long,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
long sessions = 0;
long packets = 0 ;
long bytes = 0;
long startTime = System.currentTimeMillis()/1000;
long endTime = System.currentTimeMillis()/1000;
long duration = 0;
long recvtime = 0;
HashSet<String> sourceIpSet = new HashSet<>();
try {
for (DosSketchLog newSketchLog : elements){
if (recvtime == 0){
recvtime = newSketchLog.getCommon_recv_time();
}else if (recvtime > newSketchLog.getCommon_recv_time()){
recvtime = newSketchLog.getCommon_recv_time();
}
String sourceIp = newSketchLog.getSource_ip();
if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
sessions += newSketchLog.getSketch_sessions();
@@ -81,14 +90,15 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
}else {
if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
if (sourceIpSet.size() < FlowWriteConfig.SOURCE_IP_LIST_LIMIT){
sourceIpSet.add(sourceIp);
}
}
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
return Tuple7.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration,recvtime);
}catch (Exception e){
logger.error("聚合中间结果集失败 {}",e);
}

View File

@@ -1,23 +1,24 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.DateUtils;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosBaselineThreshold;
import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.HbaseUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class ParseBaselineThreshold {
private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class);
private static final Log logger = LogFactory.get();
private static ArrayList<String> floodTypeList = new ArrayList<>();
private static Table table = null;
@@ -33,21 +34,23 @@ public class ParseBaselineThreshold {
private static void prepareHbaseEnv() throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
config.set("hbase.defaults.for.version", "2.2.3");
config.set("hbase.defaults.for.version.skip", "true");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
TableName tableName = TableName.valueOf(FlowWriteConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
long currentTimeMillis = System.currentTimeMillis();
scan = new Scan()
.setAllowPartialResults(true)
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
.setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(FlowWriteConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
.setLimit(FlowWriteConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
@@ -81,29 +84,4 @@ public class ParseBaselineThreshold {
}
return baselineMap;
}
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
long p200D = DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime();
System.out.println(p200D);
System.out.println(currentTimeMillis);
System.out.println(currentTimeMillis - p200D);
Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {
Map<String, DosBaselineThreshold> stringTuple2Map = baselineMap.get(key);
Set<String> strings = stringTuple2Map.keySet();
for (String s:strings){
DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s);
System.out.println(key+"---"+s+"---"+dosBaselineThreshold);
}
}
System.out.println(baselineMap.size());
}
}

View File

@@ -1,12 +1,11 @@
package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -15,8 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.*;
/**
* @author wlh
@@ -24,22 +22,19 @@ import java.util.HashMap;
public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource() {
return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource() {
return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
}
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy() {
return WatermarkStrategy
.<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(CommonConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
.withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
}
@@ -47,17 +42,26 @@ public class ParseSketchLog {
@Override
public void flatMap(String s, Collector<DosSketchLog> collector) {
try {
if (StringUtil.isNotBlank(s)){
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
if (StringUtil.isNotBlank(s)) {
final long recv_time = System.currentTimeMillis()/1000;
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setCommon_recv_time(recv_time);
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
dosSketchLog.setVsys_id(vsysId);
String sourceIp = obj.get("source_ip").toString();
String destinationIp = obj.get("destination_ip").toString();
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
@@ -69,19 +73,12 @@ public class ParseSketchLog {
dosSketchLog.setSketch_packets(sketchPackets);
dosSketchLog.setSketch_bytes(sketchBytes);
collector.collect(dosSketchLog);
logger.debug("数据解析成功:{}",dosSketchLog.toString());
logger.debug("数据解析成功:{}", dosSketchLog.toString());
}
}
} catch (Exception e) {
logger.error("数据解析错误:{} \n{}",s,e);
logger.error("数据解析错误:{} \n{}", s, e);
}
}
}
public static void main(String[] args) throws Exception {
flatSketchSource().print();
FlinkEnvironmentUtils.streamExeEnv.execute();
}
}

View File

@@ -1,38 +1,39 @@
package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.NacosUtils;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wlh
*/
public class ParseStaticThreshold {
private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
private static final Log logger = LogFactory.get();
private static String encryptpwd;
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
encryptpwd = getEncryptpwd();
@@ -44,17 +45,18 @@ public class ParseStaticThreshold {
private static String getEncryptpwd() {
String psw = HttpClientUtils.ERROR_MESSAGE;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("password", "admin");
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
psw = data.get("encryptpwd").toString();
} else {
logger.error(msg);
@@ -68,38 +70,6 @@ public class ParseStaticThreshold {
return psw;
}
/**
* 登录bifang服务获取token
*
* @return token
*/
private static String loginBifangServer() {
String token = HttpClientUtils.ERROR_MESSAGE;
try {
if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("username", "admin");
parms.put("password", encryptpwd);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
token = data.get("token").toString();
} else {
logger.error(msg);
}
}
}
} catch (Exception e) {
logger.error("登录失败,未获取到token ", e);
}
return token;
}
/**
* 获取vsysId配置列表
@@ -109,26 +79,28 @@ public class ParseStaticThreshold {
private static ArrayList<DosVsysId> getVsysId() {
ArrayList<DosVsysId> vsysIdList = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "vsysId desc");
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
parms.put("page_size", -1);
// parms.put("orderBy", "vsysId desc");
parms.put("type", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = FlowWriteConfig.BIFANG_SERVER_TOKEN;
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
Object list = data.get("vsys_list");
if (list != null) {
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId{}条", vsysIdList.size());
List<DosVsysId> dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class);
vsysIdList= (ArrayList)dosVsysIds;
logger.info("获取到vsysId {}条", vsysIdList.size());
} else {
logger.warn("vsysIdList为空");
}
@@ -145,53 +117,57 @@ public class ParseStaticThreshold {
/**
* 根据vsysId获取静态阈值配置列表
*
* @return thresholds
*/
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
// ArrayList<DosVsysId> vsysId = getVsysId();
ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>();
ArrayList<DosVsysId> vsysIds = getVsysId();
try {
// if (vsysId != null){
// for (DosVsysId dosVsysId : vsysId) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
// parms.put("vsysId", dosVsysId.getVsysId());
parms.put("vsysId", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
logger.info("获取到静态阈值配置{}条", thresholds.size());
} else {
logger.warn("静态阈值配置为空");
if (vsysIds != null) {
for (DosVsysId dosVsysId : vsysIds) {
Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId();
Integer[] superiorIds = dosVsysId.getSuperior_ids();
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("page_size", -1);
// parms.put("order_by", "profileId asc");
parms.put("is_valid", 1);
parms.put("vsys_id", vsysId);
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = FlowWriteConfig.BIFANG_SERVER_TOKEN;
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
Object list = data.get("dos_detections");
if (list != null) {
List<DosDetectionThreshold> dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class);
ArrayList<DosDetectionThreshold> thresholds = (ArrayList)dosDetectionThresholds;
for (DosDetectionThreshold dosDetectionThreshold : thresholds) {
dosDetectionThreshold.setSuperior_ids(superiorIds);
vsysThresholds.add(dosDetectionThreshold);
}
logger.info("获取到vsys id是{}静态阈值配置{}条", vsysId, thresholds.size());
} else {
logger.warn("静态阈值配置为空");
}
} else {
logger.error(msg);
}
}
} else {
logger.error(msg);
}
}
}
// }
// }
} catch (Exception e) {
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
return thresholds;
return vsysThresholds;
}
/**
@@ -199,15 +175,22 @@ public class ParseStaticThreshold {
*
* @return threshold RangeMap
*/
static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
static HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() {
HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4);
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
String attackType = threshold.getAttackType();
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServerIpList();
String attackType = threshold.getAttack_type();
int vsysId = threshold.getVsys_id();
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServer_ip_list();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
if (ipAddressString.isIPAddress()) {
@@ -239,7 +222,8 @@ public class ParseStaticThreshold {
}
}
}
thresholdRangeMap.put(attackType, treeRangeMap);
rangeMap.put(attackType, treeRangeMap);
thresholdRangeMap.put(vsysId, rangeMap);
}
}
} catch (Exception e) {
@@ -248,28 +232,51 @@ public class ParseStaticThreshold {
return thresholdRangeMap;
}
public static void main(String[] args) {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
System.out.println("------------------------");
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
System.out.println("------------------------");
for (String type : staticThreshold.keySet()) {
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
DosDetectionThreshold threshold = asMapOfRanges.get(range);
System.out.println(type + "---" + range + "---" + threshold);
/**
* 登录bifang服务获取token
*
* @return token
*/
private static String loginBifangServer() {
String token = HttpClientUtils.ERROR_MESSAGE;
try {
final HashMap<String, Object> parmsMap = new HashMap<>();
String urlString = FlowWriteConfig.BIFANG_SERVER_URI+FlowWriteConfig.BIFANG_SERVER_LOGIN_PATH;
parmsMap.put("username","admin");
parmsMap.put("password",encryptpwd);
parmsMap.put("auth_mode","");
final String jsonInputString = JSON.toJSONString(parmsMap);
final URL url = new URL(urlString);
final HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(jsonInputString.getBytes());
os.flush();
os.close();
int responseCode = connection.getResponseCode();
if (responseCode == 200 ) {
// 读取响应内容
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String line;
StringBuilder response = new StringBuilder();
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
HashMap<String, Object> body = JSONObject.parseObject(String.valueOf(response), HashMap.class);
final HashMap data = JSONObject.parseObject(String.valueOf( body.get("data")), HashMap.class);
token = (String) data.get("token");
}
System.out.println("------------------------");
} catch (Exception e) {
logger.error("登录失败,未获取到token ", e);
}
// String s = loginBifangServer();
// System.out.println(s);
return token;
}
}

View File

@@ -1,15 +1,15 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TrafficServerIpMetrics {
private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
// private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
private static final Log logger = LogFactory.get();
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
DosMetricsLog dosMetricsLog = new DosMetricsLog();
@@ -19,22 +19,18 @@ class TrafficServerIpMetrics {
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
return dosMetricsLog;
}
private static long timeFloor(long sketchStartTime){
return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME;
return sketchStartTime / FlowWriteConfig.FLINK_WINDOW_MAX_TIME * FlowWriteConfig.FLINK_WINDOW_MAX_TIME;
}
private static int getPartitionNumByIp(String destinationIp){
return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM;
}
public static void main(String[] args) {
System.out.println(getPartitionNumByIp("146.177.223.43"));
System.out.println("146.177.223.43".hashCode());
return Math.abs(destinationIp.hashCode()) % FlowWriteConfig.DESTINATION_IP_PARTITION_NUM;
}
}

View File

@@ -1,8 +1,9 @@
package com.zdjizhi.sink;
import com.zdjizhi.common.CommonConfig;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -13,9 +14,10 @@ class DosEventSink {
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
dosEventLogOutputStream
.filter(Objects::nonNull)
.map(JsonMapper::toJsonString)
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
// .map(JsonMapper::toJsonString)
.map(JSONObject::toJSONString)
.addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
.setParallelism(FlowWriteConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
}
}

View File

@@ -1,6 +1,8 @@
package com.zdjizhi.sink;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
@@ -9,19 +11,19 @@ import com.zdjizhi.etl.EtlProcessFunction;
import com.zdjizhi.etl.ParseSketchLog;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author 94976
*/
public class OutputStreamSink {
private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
private static final Log logger = LogFactory.get();
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
@@ -30,30 +32,33 @@ public class OutputStreamSink {
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
FlinkEnvironmentUtils.streamExeEnv.execute(FlowWriteConfig.STREAM_EXECUTION_JOB_NAME);
} catch (Exception e) {
logger.error("任务启动失败 {}",e);
}
}
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
return middleStream
.process(new DosDetection()).setParallelism(FlowWriteConfig.FLINK_DETECTION_MAP_PARALLELISM);
}
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
return ParseSketchLog.getSketchSource()
.keyBy(new KeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.FLINK_WINDOW_MAX_TIME)))
.process(new EtlProcessFunction())
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
.setParallelism(FlowWriteConfig.FLINK_FIRST_AGG_PARALLELISM);
}
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{
@Override
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
return Tuple2.of(
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
return Tuple3.of(
dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip());
dosSketchLog.getDestination_ip(),
dosSketchLog.getVsys_id());
}
}

View File

@@ -1,9 +1,10 @@
package com.zdjizhi.sink;
import com.zdjizhi.common.CommonConfig;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -14,8 +15,11 @@ class TrafficServerIpMetricsSink {
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
.setParallelism(FlowWriteConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
}
}

View File

@@ -1,6 +1,8 @@
package com.zdjizhi.source;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -18,17 +20,18 @@ public class DosSketchSource {
public static DataStreamSource<String> createDosSketchSource(){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", FlowWriteConfig.KAFKA_GROUP_ID);
if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return streamExeEnv.addSource(new FlinkKafkaConsumer<String>(
CommonConfig.KAFKA_INPUT_TOPIC_NAME,
FlowWriteConfig.KAFKA_INPUT_TOPIC_NAME,
new SimpleStringSchema(), properties))
.setParallelism(CommonConfig.KAFKA_INPUT_PARALLELISM);
.setParallelism(FlowWriteConfig.KAFKA_INPUT_PARALLELISM);
}
}

View File

@@ -1,23 +0,0 @@
package com.zdjizhi.utils;
import java.util.Collection;
import java.util.HashSet;
/**
* @author wlh
* 扩展集合处理工具
*/
public class CollectionUtils {
public static<T> Collection<T> takeUniqueLimit(Collection<T> collection, int limit){
int i =0;
Collection<T> newSet = new HashSet<>();
for (T t:collection){
if (i < limit){
newSet.add(t);
i += 1;
}
}
return newSet;
}
}

View File

@@ -1,9 +1,9 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -17,7 +17,8 @@ import java.util.concurrent.locks.Lock;
public class DistributedLock implements Lock, Watcher {
private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
// private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
private static final Log logger = LogFactory.get();
private ZooKeeper zk = null;
/**

View File

@@ -1,8 +1,6 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -13,7 +11,7 @@ public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
streamExeEnv.setParallelism(FlowWriteConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
/*
// 每 1000ms 开始一次 checkpoint

View File

@@ -1,11 +1,17 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ConnectTimeoutException;
@@ -18,8 +24,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
@@ -38,7 +42,8 @@ public class HttpClientUtils {
/** 全局连接池对象 */
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
// private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
private static final Log logger = LogFactory.get();
public static final String ERROR_MESSAGE = "-1";
/*
@@ -47,9 +52,9 @@ public class HttpClientUtils {
static {
// 设置最大连接数
CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE);
}
@@ -61,11 +66,11 @@ public class HttpClientUtils {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
.setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
.setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT)
// 请求超时时间
.setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
.setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT)
// 响应超时时间
.setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
.setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT)
.build();
/*

View File

@@ -0,0 +1,182 @@
package com.zdjizhi.utils;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.*;
import com.geedgenetworks.utils.IpLookupV2;
import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Joiner;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
import com.zdjizhi.utils.connections.http.HttpClientService;
import org.apache.http.client.utils.URIBuilder;
import java.io.ByteArrayInputStream;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author wangchengcheng
* @version 2023/11/10 15:23
*/
public class IpLookupUtils {
private static final Log logger = LogFactory.get();
private static final String ipBuiltInName = "ip_builtin.mmdb";
private static final String ipUserDefinedName = "ip_user_defined.mmdb";
/**
* ip定位库
*/
private static IpLookupV2 ipLookup;
/**
* 定位库默认分隔符
*/
private static final String LOCATION_SEPARATOR = ".";
/**
* 最大重试次数
*/
private static final int TRY_TIMES = 5;
/**
* http connections
*/
private static final HttpClientService httpClientService;
/**
* 定位库元数据缓存
*/
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
private static String currentSha256IpUserDefined = "";
private static String currentSha256IpBuiltin = "";
static {
httpClientService = new HttpClientService();
stuffKnowledgeMetaCache();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
stuffKnowledgeMetaCache();
}
}, 0, FlowWriteConfig.KNOWLEDGE_EXECUTION_INTERVAL);
}
private static void stuffKnowledgeMetaCache(){
final KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_BUILTIN_KD_ID);
if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
}
final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID);
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta);
}
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256();
currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256();
reloadIpLookup();
}
}
/**
* 从HDFS下载文件更新IpLookup
*/
private static void reloadIpLookup() {
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
for (String fileName : knowledgeMetaCache.keySet()) {
int retryNum = 0;
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
String metaSha256 = knowlegeBaseMeta.getSha256();
while (retryNum < TRY_TIMES) {
System.out.println("download file " + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
Long startTime = System.currentTimeMillis();
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
if (httpGetByte != null && httpGetByte.length > 0) {
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
if (metaSha256.equals(downloadFileSha256)) {
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
switch (fileName) {
case ipBuiltInName:
builder.loadDataFile(inputStream);
break;
case ipUserDefinedName:
builder.loadDataFilePrivate(inputStream);
break;
default:
}
System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
retryNum = TRY_TIMES;
} else {
logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
retryNum++;
}
} else {
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
retryNum++;
}
}
}
ipLookup = builder.build();
}
/**
* 根据配置组合生成知识库元数据过滤参数
*
* @return 过滤参数
*/
private static String getFilterParameter() {
String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]";
return expr;
}
public static String getCountryLookup(String ip) {
return ipLookup.countryLookup(ip);
}
private static KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) {
KnowlegeBaseMeta knowlegeBaseMeta = null;
String knowledgeInfo = null;
try {
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.KNOWLEDGE_BASE_URL);
HashMap<String, Object> parms = new HashMap<>();
parms.put("kb_id", kd_id);
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.KNOWLEDGE_BASE_PATH, parms);
knowledgeInfo = HttpClientUtils.httpGet(uriBuilder.build());
if (knowledgeInfo.contains("200")) {
final Map<String, Object> jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class);
JSONPath jsonPath = JSONPath.of(getFilterParameter());
String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString();
if (StringUtil.isNotBlank(extract)) {
JSONArray jsonArray = JSON.parseArray(extract);
if (jsonArray.size() > 0) {
for (int i = 0; i < jsonArray.size(); i++) {
knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
}
}
}
} else {
logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo);
}
} catch (URISyntaxException e) {
logger.error("构造URI异常", e);
} catch (Exception e) {
logger.error("获取knowledge_base失败", e);
}
return knowlegeBaseMeta;
}
}

View File

@@ -1,29 +0,0 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
public class IpUtils {
/**
* IP定位库工具类
*/
public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb")
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb")
.build();
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("49.7.115.37"));
String ips = "182.168.50.23,182.168.50.45,182.168.56.9,182.168.56.8,92.168.50.58,19.168.56.7,12.168.56.6,2.168.50.40,1.168.50.19,9.168.50.6,2.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
for (String ip:ips.split(",")){
System.out.println(ip+"--"+ipLookup.countryLookup(ip));
}
}
}

View File

@@ -1,6 +1,6 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
@@ -11,11 +11,11 @@ public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return properties;

View File

@@ -1,90 +0,0 @@
package com.zdjizhi.utils;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
public class NacosUtils {
private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
private static Properties nacosProperties = new Properties();
private static Properties commonProperties = new Properties();
private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout");
static {
createConfigService();
}
private static void getProperties() {
nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD);
}
private static void createConfigService() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(nacosProperties);
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
commonProperties.load(new StringReader(config));
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
commonProperties.clear();
commonProperties.load(new StringReader(configMsg));
} catch (IOException e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
} catch (Exception e) {
e.printStackTrace();
logger.error("获取nacos配置失败", e);
}
}
public static String getStringProperty(String key) {
return commonProperties.getProperty(key);
}
public static Integer getIntProperty(String key) {
return Integer.parseInt(commonProperties.getProperty(key));
}
public static Double getDoubleProperty(String key) {
return Double.parseDouble(commonProperties.getProperty(key));
}
public static Long getLongProperty(String key) {
return Long.parseLong(commonProperties.getProperty(key));
}
public static Boolean getBooleanProperty(String key) {
return "true".equals(commonProperties.getProperty(key).toLowerCase().trim());
}
}

View File

@@ -1,11 +1,12 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
public class SnowflakeId {
private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class);
// private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class);
private static final Log logger = LogFactory.get();
/**
* 共64位 第一位为符号位 默认0
@@ -98,7 +99,7 @@ public class SnowflakeId {
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
static {
idWorker = new SnowflakeId(CommonConfig.HBASE_ZOOKEEPER_QUORUM, CommonConfig.DATA_CENTER_ID_NUM);
idWorker = new SnowflakeId(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, FlowWriteConfig.DATA_CENTER_ID_NUM);
}
//==============================Constructors=====================================
@@ -107,7 +108,7 @@ public class SnowflakeId {
* 构造函数
*/
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
DistributedLock lock = new DistributedLock(CommonConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1");
DistributedLock lock = new DistributedLock(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1");
try {
lock.lock();
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);

View File

@@ -1,11 +1,11 @@
package com.zdjizhi.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -13,7 +13,8 @@ import java.util.concurrent.CountDownLatch;
public class ZookeeperUtils implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
// private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
private static final Log logger = LogFactory.get();
private ZooKeeper zookeeper;

View File

@@ -0,0 +1,261 @@
package com.zdjizhi.utils.connections.http;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.exception.FlowWriteException;
import org.apache.commons.io.IOUtils;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import javax.net.ssl.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
public class HttpClientService {
private static final Log log = LogFactory.get();
/**
* 在调用SSL之前需要重写验证方法取消检测SSL
* 创建ConnectionManager添加Connection配置信息
*
* @return HttpClient 支持https
*/
private PoolingHttpClientConnectionManager getSslClientManager() {
try {
// 在调用SSL之前需要重写验证方法取消检测SSL
X509TrustManager trustManager = new X509TrustManager() {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
@Override
public void checkClientTrusted(X509Certificate[] xcs, String str) {
}
@Override
public void checkServerTrusted(X509Certificate[] xcs, String str) {
}
};
SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
ctx.init(null, new TrustManager[]{trustManager}, null);
SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", socketFactory).build();
// 创建ConnectionManager添加Connection配置信息
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
// 设置最大连接数
connManager.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
connManager.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE);
return connManager;
} catch (KeyManagementException | NoSuchAlgorithmException e) {
throw new FlowWriteException(e.getMessage());
}
}
/**
* 获取Http客户端连接对象
*
* @param socketTimeOut 响应超时时间
* @return Http客户端连接对象
*/
private CloseableHttpClient getHttpClient(int socketTimeOut) {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
.setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT)
// 请求超时时间
.setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT)
// 响应超时时间
.setSocketTimeout(socketTimeOut)
.build();
/**
* 测出超时重试机制为了防止超时不生效而设置
* 如果直接放回false,不重试
* 这里会根据情况进行判断是否重试
*/
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
if (executionCount >= 3) {// 如果已经重试了3次就放弃
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof SocketTimeoutException) {
if (exception.getMessage().contains("Read timed out")) {
return false;
}
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
if (!(request instanceof HttpEntityEnclosingRequest)) {
return true;
}
return false;
};
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
return Long.parseLong(value) * 1000;
}
}
return 60 * 1000;//如果没有约定则默认定义时长为60s
};
// 创建httpClient
return HttpClients.custom()
// 把请求相关的超时信息设置到连接客户端
.setDefaultRequestConfig(requestConfig)
// 把请求重试设置到连接客户端
.setRetryHandler(retry)
.setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
.setConnectionManager(getSslClientManager())
.build();
}
public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) {
InputStream result = null;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient(socketTimeout);
// 创建GET请求对象
HttpGet httpGet = new HttpGet(url);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
}
}
CloseableHttpResponse response = null;
try {
// 执行请求
response = httpClient.execute(httpGet);
// 获取响应实体
result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
// 获取响应信息
EntityUtils.consume(response.getEntity());
} catch (ClientProtocolException e) {
log.error("current file: {},Protocol error:{}", url, e.getMessage());
} catch (ParseException e) {
log.error("current file: {}, Parser error:{}", url, e.getMessage());
} catch (IOException e) {
log.error("current file: {},IO error:{}", url, e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
log.error("Release Connection error:{}", e.getMessage());
}
}
return result;
}
}
public byte[] httpGetByte(String url, int socketTimeout, Header... headers) {
byte[] result = null;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient(socketTimeout);
// 创建GET请求对象
HttpGet httpGet = new HttpGet(url);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
}
}
CloseableHttpResponse response = null;
try {
// 执行请求
response = httpClient.execute(httpGet);
// 获取响应实体
result = IOUtils.toByteArray(response.getEntity().getContent());
// 获取响应信息
EntityUtils.consume(response.getEntity());
} catch (ClientProtocolException e) {
log.error("current file: {},Protocol error:{}", url, e.getMessage());
} catch (ParseException e) {
log.error("current file: {}, Parser error:{}", url, e.getMessage());
} catch (IOException e) {
log.error("current file: {},IO error:{}", url, e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
log.error("Release Connection error:{}", e.getMessage());
}
}
return result;
}
}
}

View File

@@ -0,0 +1,13 @@
package com.zdjizhi.utils.exception;
public class FlowWriteException extends RuntimeException {
public FlowWriteException() {
}
public FlowWriteException(String message) {
super(message);
}
}

View File

@@ -5,7 +5,7 @@ stream.execution.environment.parallelism=1
stream.execution.job.name=DOS-DETECTION-APPLICATION
#输入kafka并行度大小
kafka.input.parallelism=1
kafka.input.parallelism=3
#输入kafka topic名
kafka.input.topic.name=DOS-SKETCH-RECORD
@@ -15,22 +15,22 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2112080949
kafka.input.group.id=dos-detection-job-221125-23132
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
kafka.output.metric.parallelism=3
#发送kafka metrics topic名
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
#kafka.output.metric.topic.name=test
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
kafka.output.event.parallelism=3
#发送kafka event topic名
kafka.output.event.topic.name=DOS-EVENT
#kafka.output.event.topic.name=storm-dos-test
#kafka.output.event.topic.name=DOS-EVENT
kafka.output.event.topic.name=dos-test
#kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9094
@@ -38,6 +38,7 @@ kafka.output.bootstrap.servers=192.168.44.12:9094
#zookeeper地址
hbase.zookeeper.quorum=192.168.44.12:2181
#hbase.zookeeper.quorum=192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181
#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间
@@ -51,7 +52,7 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines
hbase.baseline.total.num=1000000
#baseline ttl单位
hbase.baseline.ttl=30
hbase.baseline.ttl=10
#设置聚合并行度2个key
flink.first.agg.parallelism=1
@@ -60,10 +61,10 @@ flink.first.agg.parallelism=1
flink.detection.map.parallelism=1
#watermark延迟
flink.watermark.max.orderness=10
flink.watermark.max.orderness=300
#计算窗口大小默认600s
flink.window.max.time=10
flink.window.max.time=60
#dos event结果中distinct source IP限制
source.ip.list.limit=10000
@@ -73,30 +74,6 @@ destination.ip.partition.num=10000
data.center.id.num=15
#IP mmdb库路径
ip.mmdb.path=D:\\data\\dat\\
#ip.mmdb.path=/home/bigdata/topology/dat/
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
#bifang服务访问地址
bifang.server.uri=http://192.168.44.72:80
#bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限tokenbifang内置无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
#加密密码路径信息
bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
#获取vaysId路径信息
bifang.server.policy.vaysid.path=/v1/system/vsys/
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
#http请求相关参数
#最大连接数
http.pool.max.connection=400
@@ -122,17 +99,51 @@ baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
#sasl.jaas.config.password=galaxy2019
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置10
sasl.jaas.config.flag=1
#nacos配置
nacos.server.addr=192.168.44.12:8848
nacos.namespace=test
nacos.username=nacos
nacos.password=nacos
nacos.data.id=dos_detection.properties
nacos.group=Galaxy
nacos.read.timeout=5000
http.socket.timeout=90000
############################## Knowledge Base 配置 ######################################
knowledge.execution.interval=30000
knowledge.base.uri=http://192.168.44.12:9999
knowledge.base.path=/v1/knowledge_base
ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289
ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9
############################## Bifang Server 配置 ######################################
bifang.server.token=aa2bdec5518ad131f71944b13ce5c298&1&
#bifang服务访问地址
bifang.server.uri=http://192.168.44.72
#bifang.server.uri=http://192.168.44.3:80
#加密密码路径信息
bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
#获取vaysId路径信息
bifang.server.policy.vaysid.path=/v1/admin/vsys
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/dos_detection
############################## 基线 配置 ######################################
static.sensitivity.threshold=1
#基线敏感阈值
baseline.sensitivity.threshold=0.2
#基于baseline判定dos攻击的上下限
baseline.sessions.minor.threshold=0.2
baseline.sessions.warning.threshold=1
baseline.sessions.major.threshold=2.5
baseline.sessions.severe.threshold=5
baseline.sessions.critical.threshold=8

View File

@@ -0,0 +1,23 @@
#Log4j
log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/flink-dos-detection.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console

View File

@@ -0,0 +1,50 @@
package com.zdjizhi.Http;
import com.alibaba.fastjson2.JSON;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.HttpClientUtils;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
public class HttpTest {
public static void main(String[] args) {
String token = HttpClientUtils.ERROR_MESSAGE;
try {
String urlString = FlowWriteConfig.BIFANG_SERVER_URI+"/v1/user/encryptpwd";
final HashMap<String, Object> parmsMap = new HashMap<>();
parmsMap.put("username","admin");
final String jsonInputString = JSON.toJSONString(parmsMap);
System.out.println("URL:"+urlString);
System.out.println("parmsString:"+jsonInputString);
final URL url = new URL(urlString);
final HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(jsonInputString.getBytes());
os.flush();
os.close();
int responseCode = connection.getResponseCode();
System.out.println("Response Code: " + responseCode);
} catch (Exception e) {
System.out.println("失败");
}
}
}

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277141, sketch_duration=59, attack_type='DNS Flood', source_ip='23.91.128.115', destination_ip='102.219.30.33', sketch_sessions=945, sketch_packets=945, sketch_bytes=446370, vsys_id=23}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277205, sketch_duration=86, attack_type='DNS Flood', source_ip='172.217.160.68', destination_ip='10.113.83.88', sketch_sessions=730, sketch_packets=730, sketch_bytes=344575, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277244, sketch_duration=47, attack_type='DNS Flood', source_ip='45.135.144.112', destination_ip='42.62.192.132', sketch_sessions=0, sketch_packets=0, sketch_bytes=47, vsys_id=1}
--DosDetectionThreshold
{profileId='6091', attackType='DNS Flood', serverIpList=[113.113.83.213, 42.62.192.132/28, 10.113.83.1/25, 102.219.30.33/29], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
{profileId='5679', attackType='DNS Flood', serverIpList=[102.219.30.33], serverIpAddr='null', packetsPerSec=500, bitsPerSec=1000000, sessionsPerSec=100000, isValid=1, vsysId=23, superiorIds=[4, 5]}

View File

@@ -0,0 +1,6 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277232, sketch_duration=59, attack_type='ICMP Flood', source_ip='45.170.244.25', destination_ip='24.152.57.56', sketch_sessions=499, sketch_packets=499, sketch_bytes=111970, vsys_id=1}
--DosDetectionThreshold
{profileId='6093', attackType='ICMP Flood', serverIpList=[31.131.80.88/29, 24.152.57.56/29, 47.93.59.1/25], serverIpAddr='null', packetsPerSec=210, bitsPerSec=0, sessionsPerSec=0, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -0,0 +1,7 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1685003938, sketch_duration=63714, attack_type='TCP SYN Flood', source_ip='5.32.144.55', destination_ip='45.188.134.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=4195, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277234, sketch_duration=57, attack_type='TCP SYN Flood', source_ip='18.65.148.128', destination_ip='23.200.74.224', sketch_sessions=54, sketch_packets=54, sketch_bytes=73427, vsys_id=1}
--DosDetectionThreshold
{profileId='6095', attackType='TCP SYN Flood', serverIpList=[23.200.74.224, 45.188.134.11/29, 41.183.0.15/29, 41.183.0.16/30], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[5, 4, 12, 27]}

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277291, sketch_duration=0, attack_type='UDP Flood', source_ip='121.14.89.209', destination_ip='192.168.50.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=0, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277233, sketch_duration=58, attack_type='UDP Flood', source_ip='192.168.50.56,192.168.50.34,192.168.50.11,192.168.50.33,192.168.50.55,192.168.50.58,192.168.50.36,192.168.50.14,192.168.50.35,192.168.50.13,192.168.50.57,192.168.50.30,192.168.50.51,192.168.50.54,192.168.50.10,192.168.50.32,192.168.50.53,192.168.50.31,192.168.50.16,192.168.50.38,192.168.50.15,192.168.50.37,192.168.50.18,192.168.50.17,192.168.50.50,192.168.50.45,192.168.50.23,192.168.50.22,192.168.50.44,192.168.50.25,192.168.50.47,192.168.50.46,192.168.50.24,192.168.50.63,192.168.50.41,192.168.50.40,192.168.50.62,192.168.50.43,192.168.50.21,192.168.50.20,192.168.50.42,192.168.50.27,192.168.50.26,192.168.50.48,192.168.50.28,192.168.50.61,192.168.50.60', destination_ip='121.14.89.209', sketch_sessions=297, sketch_packets=297, sketch_bytes=371404, vsys_id=1}
--DosDetectionThreshold
{profileId='5333', attackType='UDP Flood', serverIpList=[192.168.50.11, 192.168.50.12], serverIpAddr='null', packetsPerSec=50, bitsPerSec=50, sessionsPerSec=50, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -19,12 +19,12 @@ public class HbaseTest {
public static void main(String[] args) throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf("dos_test");
Connection conn = ConnectionFactory.createConnection(config);

View File

@@ -55,7 +55,7 @@ public class NacosTest {
String content = configService.getConfig(DATA_ID, GROUP, 5000);
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
System.out.println(FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD);
} catch (Exception e) {
e.printStackTrace();
}

View File

@@ -0,0 +1,237 @@
package com.zdjizhi.etl;
import com.geedgenetworks.utils.StringUtil;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.IpLookupUtils;
import com.zdjizhi.utils.SnowflakeId;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.math.BigDecimal;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashSet;
public class DosDetectionTest {
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
@Test
public void dosDetectionTest(){
DosDetectionThreshold dosDetectionThreshold = new DosDetectionThreshold();
ArrayList<String> serverIpList = new ArrayList<>();
serverIpList.add("192.168.50.11");
serverIpList.add("192.168.50.1/24");
serverIpList.add("FC::12:0:0/54");
serverIpList.add("FC::12:0:0");
dosDetectionThreshold.setProfile_id(4437);
dosDetectionThreshold.setAttack_type("DNS Flood");
dosDetectionThreshold.setServer_ip_list(serverIpList);
dosDetectionThreshold.setSessions_per_sec(1);
dosDetectionThreshold.setPackets_per_sec(1);
dosDetectionThreshold.setBits_per_sec(100000);
dosDetectionThreshold.setIs_valid(1);
dosDetectionThreshold.setSuperior_ids(new Integer[]{5,4,12,27});
DosSketchLog dosSketchLog = new DosSketchLog ();
dosSketchLog.setSketch_sessions(68);
dosSketchLog.setSketch_packets(68);
dosSketchLog.setSketch_bytes(285820);//185.82
dosSketchLog.setVsys_id(1);
dosSketchLog.setAttack_type("ICMP Flood");
dosSketchLog.setSource_ip("45.170.244.25");
dosSketchLog.setDestination_ip("24.152.57.56");
//静态阈值获取
long sessionBase = dosDetectionThreshold.getSessions_per_sec();
long pktBase=dosDetectionThreshold.getPackets_per_sec();
long bitBase=dosDetectionThreshold.getBits_per_sec();
//基于速率进行计算
long diffSession = dosSketchLog.getSketch_sessions() - sessionBase;
long diffPkt = dosSketchLog.getSketch_packets() - pktBase;
long diffByte = dosSketchLog.getSketch_bytes() - bitBase;
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
long profileId = 0;
DosEventLog result =null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfile_id();
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
System.out.println(result);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfile_id();
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
System.out.println(result);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = dosDetectionThreshold.getProfile_id();
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
System.out.println(result);
}
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
Integer staticSensitivityThreshold = 100;
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < 0.2) {
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
result = getResult(value, base, profileId, severity, percent+1, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
result.setSeverity(Severity.MAJOR.severity);
}
// logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
}
// else {
// logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
// }
}
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
// dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
dosEventLog.setSession_rate(value.getSketch_sessions());
dosEventLog.setPacket_rate(value.getSketch_packets());
dosEventLog.setBit_rate(value.getSketch_bytes());
return dosEventLog;
}
public String getConditions(double percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return "Rate > " +
base + " " +
tag + "/s" + "(>"+condition+"%)";
case BASELINE_CONDITION_TYPE:
return tag + " > " +
PERCENT_INSTANCE.format(percent) + " of baseline";
case SENSITIVITY_CONDITION_TYPE:
return String.valueOf(sessions) + " " +
tag + "/s Unusually high " +
StringUtils.capitalize(tag);
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
}
private String getSourceCountryList(String sourceIpList) {
if (StringUtil.isNotBlank(sourceIpList)) {
String countryList;
try {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpLookupUtils.getCountryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
// logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
return StringUtil.EMPTY;
}
} else {
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
}
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= 0.5 && diffPercent < 1) {
return Severity.MINOR;
} else if (diffPercent >= 1 && diffPercent < 2.5) {
return Severity.WARNING;
} else if (diffPercent >= 2.5 && diffPercent < 5) {
return Severity.MAJOR;
} else if (diffPercent >= 5 && diffPercent < 8) {
return Severity.SEVERE;
} else if (diffPercent >= 8) {
return Severity.CRITICAL;
} else {
return Severity.NORMAL;
}
}
private enum Severity {
/**
* 判断严重程度枚举类型
*/
CRITICAL("Critical"),
SEVERE("Severe"),
MAJOR("Major"),
WARNING("Warning"),
MINOR("Minor"),
NORMAL("Normal");
private final String severity;
@Override
public String toString() {
return this.severity;
}
Severity(String severity) {
this.severity = severity;
}
}
}

View File

@@ -0,0 +1,11 @@
package com.zdjizhi.etl;
import org.junit.Test;
public class EtlProcessFunctionTest {
@Test
public void EtlProcessFunctionTest(){
}
}