24 Commits

Author SHA1 Message Date
unknown
24d70f690e TSG-15712 修正DoS基线阈值告警信息中告警严重程度与告警值不匹配问题 2023-06-27 17:31:56 +08:00
unknown
77e982b22f TSG-15286 静态阈值新增业务测试用例 2023-06-09 10:36:54 +08:00
unknown
b3a23686a0 GAL-352 zdjizhi 1.1.1更换为zdjizhi 1.1.3 2023-06-08 17:01:26 +08:00
unknown
b9a694ddb9 GAL-349 优化DoS检测程序知识库更新流程 2023-06-08 16:57:19 +08:00
unknown
6fb37324ff GAL-352 获取SketchLog及bifang静态阈值适配Fastjson2 2023-06-07 14:29:53 +08:00
unknown
315b638470 GAL-352 DoS检测适配Fastjson2序列化库 2023-06-06 17:53:06 +08:00
unknown
bd48417eb8 删除注释代码 2023-05-26 15:51:57 +08:00
unknown
72acc976e3 TSG-15219 修复静态阈值的condition处理逻辑,新增静态阈值单元测试类 2023-05-26 15:44:37 +08:00
unknown
6be3ea7f1e TSG-15219 优化DoS静态阈值下的检测逻辑 2023-05-24 14:36:29 +08:00
unknown
04ee45f77d TSG-15167 新增知识库文件校验功能 2023-05-23 10:38:15 +08:00
unknown
d8b0a7637b 新增命中静态阈值后填充Profile ID 2023-04-03 17:35:36 +08:00
unknown
b56a2ec31e GAL-306 修复DoS检测不能读取HDFS中IP定位库问题,支持yarn per-job运行模式 2023-03-27 17:15:27 +08:00
unknown
11747d9964 GAL-296 解决使用Yarn模式运行时的依赖冲突问题 2023-03-07 18:58:25 +08:00
wanglihui
ce15a27a1b TSG-13094 修复DoS Event日志出现MVsys id 2022-12-21 17:11:14 +08:00
wanglihui
01bbe562c9 Merge branch 'tsg-22.11' of git.mesalab.cn:bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-19 10:18:44 +08:00
wanglihui
f07651cf14 修改vsys id字段名为common_t_vsys_id 2022-12-19 10:18:17 +08:00
unknown
7c201a8a3f 新增Nacos Namespace配置,删除更新至HDFS时Flush操作 2022-12-16 16:52:33 +08:00
wanglihui
78435d54ea Merge branch 'knowledge' of https://git.mesalab.cn/bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-06 19:11:30 +08:00
wanglihui
76c9247bb9 Merge branch 'knowledge' of https://git.mesalab.cn/bigdata/tsg/flink-dos-detection into tsg-22.11 2022-12-06 19:10:28 +08:00
wanglihui
488b7c6644 修改部分日志输出 2022-12-06 17:13:09 +08:00
wanglihui
b409150532 修复提交yarn集群依赖冲突bug 2022-10-17 18:15:11 +08:00
wanglihui
7e6d5fcfc5 修复无法vsys id 2022-10-17 11:10:35 +08:00
wanglihui
859cd379e5 DoS 检测支持vsys id 2022-09-23 18:37:33 +08:00
wanglihui
47ddef9bca DoS 检测事件日志默认VSYS ID 为 1 2022-08-19 10:17:52 +08:00
35 changed files with 1197 additions and 644 deletions

83
pom.xml
View File

@@ -20,7 +20,7 @@
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
<url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -98,8 +98,27 @@
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src\main</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<dependencies>
@@ -111,17 +130,17 @@
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
<!--<artifactId>slf4j-api</artifactId>-->
<!--<version>1.7.21</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.slf4j</groupId>-->
<!--<artifactId>slf4j-log4j12</artifactId>-->
<!--<version>1.7.21</version>-->
<!--</dependency>-->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
@@ -147,6 +166,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
@@ -160,15 +180,23 @@
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-hdfs</artifactId>-->
<!-- <version>${hadoop.version}</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
@@ -185,6 +213,14 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
@@ -225,7 +261,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.1.1</version>
<version>1.1.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -242,6 +278,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
@@ -259,6 +302,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->

View File

@@ -1,7 +1,6 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.CommonConfigurations;
import com.zdjizhi.utils.NacosUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -54,17 +53,7 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
//
// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
@@ -88,10 +77,12 @@ public class CommonConfig {
public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
public static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout");
public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token");
public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single");

View File

@@ -2,13 +2,14 @@ package com.zdjizhi.common;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
/**
* @author wlh
*/
public class DosDetectionThreshold implements Serializable {
private String profileId;
private long profileId;
private String attackType;
private ArrayList<String> serverIpList;
private String serverIpAddr;
@@ -16,50 +17,14 @@ public class DosDetectionThreshold implements Serializable {
private long bitsPerSec;
private long sessionsPerSec;
private int isValid;
private int vsysId;
private Integer[] superiorIds;
@Override
public String toString() {
return "DosDetectionThreshold{" +
"profileId='" + profileId + '\'' +
", attackType='" + attackType + '\'' +
", serverIpList=" + serverIpList +
", serverIpAddr='" + serverIpAddr + '\'' +
", packetsPerSec=" + packetsPerSec +
", bitsPerSec=" + bitsPerSec +
", sessionsPerSec=" + sessionsPerSec +
", isValid=" + isValid +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DosDetectionThreshold threshold = (DosDetectionThreshold) o;
return packetsPerSec == threshold.packetsPerSec &&
bitsPerSec == threshold.bitsPerSec &&
sessionsPerSec == threshold.sessionsPerSec &&
isValid == threshold.isValid &&
Objects.equals(profileId, threshold.profileId) &&
Objects.equals(attackType, threshold.attackType) &&
Objects.equals(serverIpList, threshold.serverIpList) &&
Objects.equals(serverIpAddr, threshold.serverIpAddr);
}
@Override
public int hashCode() {
return Objects.hash(profileId, attackType, serverIpList, serverIpAddr, packetsPerSec, bitsPerSec, sessionsPerSec, isValid);
}
public String getProfileId() {
public long getProfileId() {
return profileId;
}
public void setProfileId(String profileId) {
public void setProfileId(long profileId) {
this.profileId = profileId;
}
@@ -118,4 +83,36 @@ public class DosDetectionThreshold implements Serializable {
public void setIsValid(int isValid) {
this.isValid = isValid;
}
public int getVsysId() {
return vsysId;
}
public void setVsysId(int vsysId) {
this.vsysId = vsysId;
}
public Integer[] getSuperiorIds() {
return superiorIds;
}
public void setSuperiorIds(Integer[] superiorIds) {
this.superiorIds = superiorIds;
}
@Override
public String toString() {
return "DosDetectionThreshold{" +
"profileId='" + profileId + '\'' +
", attackType='" + attackType + '\'' +
", serverIpList=" + serverIpList +
", serverIpAddr='" + serverIpAddr + '\'' +
", packetsPerSec=" + packetsPerSec +
", bitsPerSec=" + bitsPerSec +
", sessionsPerSec=" + sessionsPerSec +
", isValid=" + isValid +
", vsysId=" + vsysId +
", superiorIds=" + Arrays.toString(superiorIds) +
'}';
}
}

View File

@@ -1,13 +1,14 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.Objects;
public class DosEventLog implements Serializable {
public class DosEventLog implements Serializable,Cloneable {
private long log_id;
private int vsys_id;
private long start_time;
private long end_time;
private long profile_id;
private String attack_type;
private String severity;
private String conditions;
@@ -27,6 +28,14 @@ public class DosEventLog implements Serializable {
this.log_id = log_id;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
public long getStart_time() {
return start_time;
}
@@ -43,6 +52,14 @@ public class DosEventLog implements Serializable {
this.end_time = end_time;
}
public long getProfile_id() {
return profile_id;
}
public void setProfile_id(long profile_id) {
this.profile_id = profile_id;
}
public String getAttack_type() {
return attack_type;
}
@@ -125,10 +142,12 @@ public class DosEventLog implements Serializable {
@Override
public String toString() {
return "dosEventLog{" +
return "DosEventLog{" +
"log_id=" + log_id +
", vsys_id=" + vsys_id +
", start_time=" + start_time +
", end_time=" + end_time +
", profile_id=" + profile_id +
", attack_type='" + attack_type + '\'' +
", severity='" + severity + '\'' +
", conditions='" + conditions + '\'' +
@@ -143,31 +162,7 @@ public class DosEventLog implements Serializable {
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosEventLog)) {
return false;
}
DosEventLog that = (DosEventLog) o;
return getLog_id() == that.getLog_id() &&
getStart_time() == that.getStart_time() &&
getEnd_time() == that.getEnd_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getSeverity(), that.getSeverity()) &&
Objects.equals(getConditions(), that.getConditions()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip()) &&
Objects.equals(getDestination_country(), that.getDestination_country()) &&
Objects.equals(getSource_ip_list(), that.getSource_ip_list()) &&
Objects.equals(getSource_country_list(), that.getSource_country_list());
}
@Override
public int hashCode() {
return Objects.hash(getLog_id(), getStart_time(), getEnd_time(), getAttack_type(), getSeverity(), getConditions(), getDestination_ip(), getDestination_country(), getSource_ip_list(), getSource_country_list(), getSession_rate(), getPacket_rate(), getBit_rate());
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}

View File

@@ -12,6 +12,7 @@ public class DosMetricsLog implements Serializable {
private long packet_rate;
private long bit_rate;
private int partition_num;
private int vsys_id;
public int getPartition_num() {
return partition_num;
@@ -69,6 +70,14 @@ public class DosMetricsLog implements Serializable {
this.bit_rate = bit_rate;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
@Override
public String toString() {
return "DosMetricsLog{" +
@@ -79,29 +88,7 @@ public class DosMetricsLog implements Serializable {
", packet_rate=" + packet_rate +
", bit_rate=" + bit_rate +
", partition_num=" + partition_num +
", vsys_id=" + vsys_id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosMetricsLog)) {
return false;
}
DosMetricsLog that = (DosMetricsLog) o;
return getSketch_start_time() == that.getSketch_start_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
getPartition_num() == that.getPartition_num() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip());
}
@Override
public int hashCode() {
return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num());
}
}

View File

@@ -15,6 +15,7 @@ public class DosSketchLog implements Serializable {
private long sketch_sessions;
private long sketch_packets;
private long sketch_bytes;
private int vsys_id;
@Override
public String toString() {
@@ -29,35 +30,10 @@ public class DosSketchLog implements Serializable {
", sketch_sessions=" + sketch_sessions +
", sketch_packets=" + sketch_packets +
", sketch_bytes=" + sketch_bytes +
", vsys_id=" + vsys_id +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosSketchLog)) {
return false;
}
DosSketchLog sketchLog = (DosSketchLog) o;
return getSketch_start_time() == sketchLog.getSketch_start_time() &&
getSketch_duration() == sketchLog.getSketch_duration() &&
getSketch_sessions() == sketchLog.getSketch_sessions() &&
getSketch_packets() == sketchLog.getSketch_packets() &&
getSketch_bytes() == sketchLog.getSketch_bytes() &&
Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) &&
Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) &&
Objects.equals(getAttack_type(), sketchLog.getAttack_type()) &&
Objects.equals(getSource_ip(), sketchLog.getSource_ip()) &&
Objects.equals(getDestination_ip(), sketchLog.getDestination_ip());
}
@Override
public int hashCode() {
return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes());
}
public String getCommon_sled_ip() {
return common_sled_ip;
}
@@ -137,4 +113,12 @@ public class DosSketchLog implements Serializable {
public void setSketch_bytes(long sketch_bytes) {
this.sketch_bytes = sketch_bytes;
}
public int getVsys_id() {
return vsys_id;
}
public void setVsys_id(int vsys_id) {
this.vsys_id = vsys_id;
}
}

View File

@@ -1,22 +1,32 @@
package com.zdjizhi.common;
import java.util.Objects;
import java.util.Arrays;
public class DosVsysId {
private int vsysId;
private Integer id;
private Integer[] superiorIds;
public int getVsysId() {
return vsysId;
public Integer getId() {
return id;
}
public void setVsysId(int vsysId) {
this.vsysId = vsysId;
public void setId(Integer id) {
this.id = id;
}
public Integer[] getSuperiorIds() {
return superiorIds;
}
public void setSuperiorIds(Integer[] superiorIds) {
this.superiorIds = superiorIds;
}
@Override
public String toString() {
return "DosVsysId{" +
"vsysId=" + vsysId +
"id=" + id +
", superiorIds=" + Arrays.toString(superiorIds) +
'}';
}
}

View File

@@ -1,17 +1,18 @@
package com.zdjizhi.etl;
import cn.hutool.core.math.MathUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.*;
import com.zdjizhi.utils.*;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.text.NumberFormat;
@@ -23,12 +24,12 @@ import java.util.concurrent.TimeUnit;
/**
* @author wlh
*/
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
public class DosDetection extends BroadcastProcessFunction<DosSketchLog,Map<String, String>, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private static final Log logger = LogFactory.get();
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
private HashMap<Integer,HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
@@ -46,6 +47,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
super.open(parameters);
logger.info("begin init");
IpUtils.loadIpLook();
logger.info("init over");
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
@@ -58,84 +65,127 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
@Override
public DosEventLog map(DosSketchLog value) {
public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) {
DosEventLog finalResult = null;
try {
String destinationIp = value.getDestination_ip();
int vsysId = value.getVsys_id();
String key = destinationIp + "-" + vsysId;
String attackType = value.getAttack_type();
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (threshold == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value);
} else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
DosDetectionThreshold threshold = null;
if (thresholdRangeMap.containsKey(vsysId)){
threshold = thresholdRangeMap.get(vsysId).getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
}
logger.debug("当前判断IP{}, 类型: {}", key, attackType);
if (threshold == null && baselineMap.containsKey(key)) {
finalResult = getDosEventLogByBaseline(value,key);
} else if (threshold == null && !baselineMap.containsKey(key)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
} else if (threshold != null) {
finalResult = getDosEventLogByStaticThreshold(value, threshold);
} else {
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", key, attackType);
}
} catch (Exception e) {
logger.error("判定失败\n {} \n{}", value, e);
}
return finalResult;
if (finalResult != null){
out.collect(finalResult);
}
}
@Override
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<DosEventLog> out) throws Exception {
if (!value.isEmpty()){
IpUtils.updateIpLook(value);
}
}
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
long diff = sketchSessions - NacosUtils.getIntProperty("static.sensitivity.threshold");
result = getDosEventLog(value, NacosUtils.getIntProperty("static.sensitivity.threshold"), diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
result.setSeverity(Severity.MAJOR.severity);
}
return result;
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
long diff = sketchSessions - staticSensitivityThreshold;
return getDosEventLog(value, staticSensitivityThreshold, diff,0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByBaseline(DosSketchLog value) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
private DosEventLog getDosEventLogByBaseline(DosSketchLog value,String key) {
String attackType = value.getAttack_type();
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
return result;
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(key).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
return getDosEventLog(value, base, diff, 0,BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) throws CloneNotSupportedException {
long sessionBase = threshold.getSessionsPerSec();
long pktBase=threshold.getPacketsPerSec();
long bitBase=threshold.getBitsPerSec();
long diffSession = value.getSketch_sessions() - sessionBase;
long diffPkt = value.getSketch_packets() - pktBase;
long diffByte = value.getSketch_bytes() - bitBase;
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
long profileId = 0;
DosEventLog result =null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result= getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = threshold.getProfileId();
result = getDosEventLog(value, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = threshold.getProfileId();
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
}
/*
ArrayList<DosEventLog> dosEventLogs = new ArrayList<>();
if (result != null){
dosEventLogs.add(result);
Integer[] superiorIds = threshold.getSuperiorIds();
if (superiorIds != null && superiorIds.length > 0){
for (Integer integer:superiorIds){
DosEventLog clone = (DosEventLog) result.clone();
clone.setVsys_id(integer);
clone.setLog_id(SnowflakeId.generateId());
dosEventLogs.add(clone);
}
}
}
*/
return result;
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent+1, type, tag);
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
result = getResult(value, base, profileId, severity, percent, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
result.setSeverity(Severity.MAJOR.severity);
}
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
} else {
@@ -145,14 +195,17 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag,dosEventLog));
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -189,25 +242,31 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
return base;
}
private String getConditions(String percent, long base, long sessions, int type, String tag) {
private String getConditions(String percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
.append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
return "Rate > " +
base + " " +
tag + "/s" + "(>"+condition+"%)";
case BASELINE_CONDITION_TYPE:
return new StrBuilder()
.append(tag).append(" > ")
.append(percent).append(" of baseline")
.toString();
return tag + " > " +
percent + " of baseline";
case SENSITIVITY_CONDITION_TYPE:
return new StrBuilder()
.append(sessions).append(" ")
.append(tag).append("/s Unusually high ")
.append(StringUtils.capitalize(tag))
.toString();
return String.valueOf(sessions) + " " +
tag + "/s Unusually high " +
StringUtils.capitalize(tag);
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}

View File

@@ -1,15 +1,15 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
@@ -18,14 +18,15 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
/**
* @author 94976
*/
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String,String,Integer>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
// private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
private static final Log logger = LogFactory.get();
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
@Override
public void process(Tuple2<String, String> keys,
public void process(Tuple3<String,String,Integer> keys,
Context context, Iterable<DosSketchLog> elements,
Collector<DosSketchLog> out) {
DosSketchLog middleResult = getMiddleResult(keys, elements);
@@ -40,7 +41,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){
DosSketchLog midResuleLog = new DosSketchLog();
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
@@ -48,6 +49,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
if (values != null){
midResuleLog.setAttack_type(keys.f0);
midResuleLog.setDestination_ip(keys.f1);
midResuleLog.setVsys_id(keys.f2);
midResuleLog.setSketch_start_time(values.f4);
midResuleLog.setSketch_duration(values.f5);
midResuleLog.setSource_ip(values.f3);

View File

@@ -1,5 +1,7 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosBaselineThreshold;
import com.zdjizhi.utils.DateUtils;
@@ -9,15 +11,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class ParseBaselineThreshold {
private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class);
// private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class);
private static final Log logger = LogFactory.get();
private static ArrayList<String> floodTypeList = new ArrayList<>();
private static Table table = null;
@@ -37,6 +38,8 @@ public class ParseBaselineThreshold {
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.set("hbase.defaults.for.version", "2.2.3");
config.set("hbase.defaults.for.version.skip", "true");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

View File

@@ -1,25 +1,16 @@
package com.zdjizhi.etl;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.function.BroadcastProcessFunc;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +24,9 @@ import java.util.*;
public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
// private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
@@ -43,28 +34,7 @@ public class ParseSketchLog {
}
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource(){
DataStreamSource<Map<String, byte[]>> broadcastSource=null;
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD);
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){
broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH);
}else {
broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties);
}
MapStateDescriptor<String,Map> descriptor =
new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class));
BroadcastStream<Map<String, byte[]>> broadcast = broadcastSource.broadcast(descriptor);
// BroadcastConnectedStream<String, List<CustomFile>> connect = DosSketchSource.createDosSketchSource().connect(broadcast);
return DosSketchSource.createDosSketchSource()
.connect(broadcast).process(new BroadcastProcessFunc());
// .flatMap(new FlatSketchLog());
return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
}
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy(){
@@ -78,16 +48,21 @@ public class ParseSketchLog {
public void flatMap(String s, Collector<DosSketchLog> collector) {
try {
if (StringUtil.isNotBlank(s)){
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
// HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
// ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
dosSketchLog.setVsys_id(vsysId);
String sourceIp = obj.get("source_ip").toString();
String destinationIp = obj.get("destination_ip").toString();
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());

View File

@@ -1,11 +1,15 @@
package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
//import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.NacosUtils;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
@@ -13,25 +17,25 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wlh
*/
public class ParseStaticThreshold {
private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
// private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
private static final Log logger = LogFactory.get();
private static String encryptpwd;
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
// private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
// private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
@@ -50,11 +54,14 @@ public class ParseStaticThreshold {
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
psw = data.get("encryptpwd").toString();
} else {
logger.error(msg);
@@ -84,11 +91,13 @@ public class ParseStaticThreshold {
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
token = data.get("token").toString();
} else {
logger.error(msg);
@@ -112,7 +121,8 @@ public class ParseStaticThreshold {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "vsysId desc");
// parms.put("orderBy", "vsysId desc");
parms.put("type", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
@@ -120,15 +130,22 @@ public class ParseStaticThreshold {
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId{}条", vsysIdList.size());
String s = JSONObject.toJSONString(list);
List<DosVsysId> dosVsysIds = JSON.parseArray(JSONObject.toJSONString(list), DosVsysId.class);
// vsysIdList= JSONObject.parseObject(JSONObject.toJSONString(list), DosVsysId.class);
vsysIdList= (ArrayList)dosVsysIds;
// vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId {}条", vsysIdList.size());
} else {
logger.warn("vsysIdList为空");
}
@@ -149,49 +166,59 @@ public class ParseStaticThreshold {
* @return thresholds
*/
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
// ArrayList<DosVsysId> vsysId = getVsysId();
ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>();
ArrayList<DosVsysId> vsysIds = getVsysId();
try {
// if (vsysId != null){
// for (DosVsysId dosVsysId : vsysId) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
// parms.put("vsysId", dosVsysId.getVsysId());
parms.put("vsysId", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
logger.info("获取到静态阈值配置{}条", thresholds.size());
} else {
logger.warn("静态阈值配置为空");
if (vsysIds != null) {
for (DosVsysId dosVsysId : vsysIds) {
Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId();
Integer[] superiorIds = dosVsysId.getSuperiorIds();
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
parms.put("vsysId", vsysId);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
// HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
// HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
HashMap<String, Object> data = JSONObject.parseObject(JSONObject.toJSONString(resposeMap.get("data")), HashMap.class);
Object list = data.get("list");
if (list != null) {
// ArrayList<DosDetectionThreshold> thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
// ArrayList<DosDetectionThreshold> thresholds = JSONObject.parseObject(JSONObject.toJSONString(list), ArrayList.class);
List<DosDetectionThreshold> dosDetectionThresholds = JSON.parseArray(JSONObject.toJSONString(list), DosDetectionThreshold.class);
ArrayList<DosDetectionThreshold> thresholds = (ArrayList)dosDetectionThresholds;
for (DosDetectionThreshold dosDetectionThreshold : thresholds) {
dosDetectionThreshold.setSuperiorIds(superiorIds);
vsysThresholds.add(dosDetectionThreshold);
}
logger.info("获取到vsys id是{}静态阈值配置{}条", vsysId, thresholds.size());
} else {
logger.warn("静态阈值配置为空");
}
} else {
logger.error(msg);
}
}
} else {
logger.error(msg);
}
}
}
// }
// }
} catch (Exception e) {
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
return thresholds;
return vsysThresholds;
}
/**
@@ -199,14 +226,17 @@ public class ParseStaticThreshold {
*
* @return threshold RangeMap
*/
static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
static HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() {
HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4);
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
String attackType = threshold.getAttackType();
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
int vsysId = threshold.getVsysId();
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServerIpList();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
@@ -239,7 +269,8 @@ public class ParseStaticThreshold {
}
}
}
thresholdRangeMap.put(attackType, treeRangeMap);
rangeMap.put(attackType, treeRangeMap);
thresholdRangeMap.put(vsysId, rangeMap);
}
}
} catch (Exception e) {
@@ -249,22 +280,27 @@ public class ParseStaticThreshold {
}
public static void main(String[] args) {
/*
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
// dosDetectionThreshold.forEach(System.out::println);
getVsysId().forEach(System.out::println);
System.out.println("------------------------");
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
*/
HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> staticThreshold = createStaticThreshold();
System.out.println("------------------------");
for (String type : staticThreshold.keySet()) {
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
DosDetectionThreshold threshold = asMapOfRanges.get(range);
System.out.println(type + "---" + range + "---" + threshold);
for (Integer integer : staticThreshold.keySet()) {
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> stringTreeRangeMapHashMap = staticThreshold.get(integer);
for (String type : stringTreeRangeMapHashMap.keySet()) {
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = stringTreeRangeMapHashMap.get(type).asMapOfRanges();
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
DosDetectionThreshold threshold = asMapOfRanges.get(range);
System.out.println(integer + "---" + type + "---" + range + "---" + threshold);
}
System.out.println("------------------------");
}
System.out.println("------------------------");
}
// String s = loginBifangServer();
// System.out.println(s);

View File

@@ -1,15 +1,15 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TrafficServerIpMetrics {
private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
// private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
private static final Log logger = LogFactory.get();
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
DosMetricsLog dosMetricsLog = new DosMetricsLog();
@@ -19,6 +19,7 @@ class TrafficServerIpMetrics {
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
return dosMetricsLog;

View File

@@ -1,76 +0,0 @@
package com.zdjizhi.function;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.etl.ParseSketchLog;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BroadcastProcessFunc extends BroadcastProcessFunction<String, Map<String, byte[]>, DosSketchLog> {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("begin init");
IpUtils.loadIpLook();
System.out.println("init over");
}
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<DosSketchLog> out) throws Exception {
try {
if (StringUtil.isNotBlank(value)){
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(value, hashmapJsonType);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
String sourceIp = obj.get("source_ip").toString();
String destinationIp = obj.get("destination_ip").toString();
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
dosSketchLog.setSource_ip(sourceIp);
dosSketchLog.setDestination_ip(destinationIp);
dosSketchLog.setSketch_sessions(sketchSessions);
dosSketchLog.setSketch_packets(sketchPackets);
dosSketchLog.setSketch_bytes(sketchBytes);
out.collect(dosSketchLog);
logger.debug("数据解析成功:{}",dosSketchLog.toString());
}
}
} catch (Exception e) {
logger.error("数据解析错误:{} \n{}",value,e);
}
}
@Override
public void processBroadcastElement(Map<String, byte[]> value, Context ctx, Collector<DosSketchLog> out) throws Exception {
IpUtils.updateIpLook(value);
}
}

View File

@@ -1,8 +1,9 @@
package com.zdjizhi.sink;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.utils.JsonMapper;
//import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -13,7 +14,8 @@ class DosEventSink {
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
dosEventLogOutputStream
.filter(Objects::nonNull)
.map(JsonMapper::toJsonString)
// .map(JsonMapper::toJsonString)
.map(JSONObject::toJSONString)
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
}

View File

@@ -1,5 +1,8 @@
package com.zdjizhi.sink;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosMetricsLog;
@@ -7,21 +10,27 @@ import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.etl.DosDetection;
import com.zdjizhi.etl.EtlProcessFunction;
import com.zdjizhi.etl.ParseSketchLog;
import com.zdjizhi.source.DosSketchSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
/**
* @author 94976
*/
public class OutputStreamSink {
private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
private static final Log logger = LogFactory.get();
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
@@ -37,7 +46,29 @@ public class OutputStreamSink {
}
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
DataStreamSource<Map<String, String>> broadcastSource=null;
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE);
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){
broadcastSource = DosSketchSource.broadcastSource(nacosProperties);
}else {
broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties);
}
MapStateDescriptor<String,Map> descriptor =
new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class));
BroadcastStream<Map<String, String>> broadcast = broadcastSource.broadcast(descriptor);
return middleStream
.connect(broadcast)
.process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
}
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
@@ -48,12 +79,13 @@ public class OutputStreamSink {
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
}
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{
@Override
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
return Tuple2.of(
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
return Tuple3.of(
dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip());
dosSketchLog.getDestination_ip(),
dosSketchLog.getVsys_id());
}
}

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.sink;
import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
@@ -14,7 +15,8 @@ class TrafficServerIpMetricsSink {
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);

View File

@@ -36,11 +36,11 @@ public class DosSketchSource {
}
public static DataStreamSource<Map<String, byte[]>> broadcastSource(Properties nacosProperties, String STORE_PATH){
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT,STORE_PATH));
public static DataStreamSource<Map<String, String>> broadcastSource(Properties nacosProperties){
return streamExeEnv.addSource(new HttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
}
public static DataStreamSource<Map<String, byte[]>> singleBroadcastSource(Properties nacosProperties){
public static DataStreamSource<Map<String, String>>singleBroadcastSource(Properties nacosProperties){
return streamExeEnv.addSource(new SingleHttpSource(nacosProperties, CommonConfig.NACOS_DATA_ID, CommonConfig.NACOS_GROUP, CommonConfig.NACOS_READ_TIMEOUT));
}
}

View File

@@ -2,11 +2,14 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
@@ -27,12 +30,19 @@ import java.util.*;
import java.util.concurrent.Executor;
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public class HttpSource extends RichHttpSourceFunction<Map<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, String> knowledgeUpdateCache;
private static final int TRY_TIMES = 3;
private static HttpClientUtils2 httpClientUtils;
//连接nacos的配置
private Properties nacosProperties;
@@ -45,48 +55,53 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
//nacos 连接超时时间
private long NACOS_READ_TIMEOUT;
//上传到hdfs的路径
private String STORE_PATH;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static Map<String, String> updateMap = new HashMap<>();
private static HashMap<String, byte[]> knowledgeFileCache;
private static Header header;
//运行状态cancel时置为false
private boolean isRunning = true;
//是否下发,默认不发送
private boolean isSending = false;
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
this.NACOS_GROUP = NACOS_GROUP;
this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT;
this.STORE_PATH = storePath;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
httpClientUtils = new HttpClientUtils2();
//初始化元数据缓存
updateMap = new HashMap<>(16);
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
knowledgeUpdateCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
//连接nacos配置
try {
configService = NacosFactory.createConfigService(nacosProperties);
}catch (NacosException e){
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
}
//初始化知识库
initKnowledge();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format = formatter.format(new Date());
logger.info(format + "receive config from nacos:" + config);
System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@@ -101,78 +116,141 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
knowledgeMetaCache.put(fileName, sha256);
updateKnowledge(fileName, filePath,sha256);
}
}
ctx.collect(knowledgeFileCache);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
}
}
} catch (Exception e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
while (isRunning) {
Thread.sleep(10000);
try {
Thread.sleep(10000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
private void initKnowledge(){
String configMsg = "";
try {
if (metaList.size() >= 1) {
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
} catch (NacosException e) {
logger.error("从Nacos获取知识库元数据配置文件异常异常信息为:{}", e.getMessage());
}
if (StringUtil.isNotBlank(configMsg)){
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
byte[] localFileByte = getLocalFile(fileName);
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
}
}
}
}
}
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
if (updateStatus){
knowledgeMetaCache.put(fileName,sha256);
knowledgeUpdateCache.put(fileName, sha256);
retryNum = TRY_TIMES;
}else {
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
InputStream inputStream = null;
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
FileOutputStream outputStream = null;
boolean updateStatus = false;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
byte[] bytes = IOUtils.toByteArray(inputStream);
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
knowledgeFileCache.put(fileName, bytes);
} catch (IOException ioException) {
ioException.printStackTrace();
HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
updateStatus=true;
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
} catch (RuntimeException e) {
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
return updateStatus;
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ;
} catch (RuntimeException | IOException e) {
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
e.printStackTrace();
}
return fileBytes;
}
@Override
public void cancel() {
this.isRunning = false;

View File

@@ -2,11 +2,14 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.JavaType;
import com.google.common.base.Joiner;
import com.jayway.jsonpath.JsonPath;
@@ -23,13 +26,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, String>> {
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
private static HashMap<String, byte[]> knowledgeFileCache;
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> knowledgeMetaCache = new HashMap<>();
private static HashMap<String, String> knowledgeUpdateCache;
private static final int TRY_TIMES = 3;
private Properties nacosProperties;
@@ -39,20 +51,16 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
private long NACOS_READ_TIMEOUT;
private static String STORE_PATH;
private static HttpClientUtils2 httpClientUtils ;
private ConfigService configService;
// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
private static Map<String, String> updateMap = new HashMap<>();
private static Header header;
private boolean isRunning = true;
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
this.nacosProperties = nacosProperties;
this.NACOS_DATA_ID = NACOS_DATA_ID;
@@ -65,33 +73,32 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
httpClientUtils = new HttpClientUtils2();
//初始化元数据缓存
updateMap = new HashMap<>(16);
knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
knowledgeUpdateCache = new HashMap<>(16);
header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
//连接nacos配置
try {
configService = NacosFactory.createConfigService(nacosProperties);
}catch (NacosException e){
logger.error("Get Schema config from Nacos error,The exception message is :{}", e.getMessage());
}
//初始化知识库
initKnowledge();
logger.info("连接nacos" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
// List<CustomFile> customFiles = new ArrayList<>();
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
// if (StringUtil.isNotBlank(config)) {
// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
// if (knowledgeLogListList.size()>=1){
// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
// String sha256 = knowledgeLog.getSha256();
// updateMap.put(name,sha256);
// }
// }
// }
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
@@ -105,20 +112,22 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
logger.info("receive update config:" + configMsg);
if (StringUtil.isNotBlank(configMsg)) {
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() >= 1) {
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
if (!sha256.equals(updateMap.get(fileName))) {
updateMap.put(fileName, sha256);
updateKnowledge(fileName, filePath);
if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
knowledgeMetaCache.put(fileName, sha256);
updateKnowledge(fileName, filePath,sha256);
}
}
ctx.collect(knowledgeFileCache);
if (!knowledgeUpdateCache.isEmpty()){
ctx.collect(knowledgeUpdateCache);
knowledgeUpdateCache.clear();
}
}
}
@@ -130,85 +139,129 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
});
while (isRunning) {
Thread.sleep(10000);
try {
Thread.sleep(10000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
// private CustomFile loadKnowledge(String fileName, String filePath) {
// InputStream inputStream = null;
// FileOutputStream outputStream = null;
// CustomFile customFile = new CustomFile();
// try {
// customFile.setFileName(fileName);
// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
// outputStream = new FileOutputStream(file);
// byte[] bytes = IOUtils.toByteArray(inputStream);
// customFile.setContent(bytes);
// inputStream = new ByteArrayInputStream(customFile.getContent());
// IoUtil.copy(inputStream, outputStream);
//
// } catch (IOException ioException) {
// ioException.printStackTrace();
// } finally {
// IOUtils.closeQuietly(inputStream);
// IOUtils.closeQuietly(outputStream);
// }
// return customFile;
// }
private void loadKnowledge(ArrayList<Object> metaList) {
InputStream inputStream = null;
try {
if (metaList.size() >= 1) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
updateMap.put(fileName, sha256);
knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
private void initKnowledge(){
String configMsg = "";
try {
configMsg=configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
} catch (NacosException e) {
logger.error("从Nacos获取知识库元数据配置文件异常异常信息为:{}", e.getMessage());
}
if (StringUtil.isNotBlank(configMsg)){
ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
if (metaList.size() > 0) {
for (Object metadata : metaList) {
JSONObject knowledgeJson = new JSONObject(metadata, false, true);
String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
byte[] localFileByte = getLocalFile(fileName);
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
knowledgeMetaCache.put(fileName, sha256);
}else {
logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
updateKnowledge(fileName,filePath,sha256);
}
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private void updateKnowledge(String fileName, String filePath) {
private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
int retryNum = 0;
try {
while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
if (inputStream !=null){
byte[] downloadBytes = IOUtils.toByteArray(inputStream);
String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
if (sha256.equals(downloadFileSha256Hex)&& downloadBytes.length > 0 ){
logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
boolean updateStatus = updateLocalFile(fileName, downloadBytes);
if (updateStatus){
knowledgeMetaCache.put(fileName,sha256);
knowledgeUpdateCache.put(fileName, sha256);
retryNum = TRY_TIMES;
}else {
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
// isSending = true;
}else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
retryNum++;
//避免频繁请求HOS
Thread.sleep(10000);
}
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
private boolean updateLocalFile(String fileName,byte[] downloadBytes) {
FileOutputStream outputStream = null;
boolean updateStatus = false;
try {
Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
outputStream = new FileOutputStream(file);
byte[] bytes = IOUtils.toByteArray(inputStream);
knowledgeFileCache.put(fileName, bytes);
inputStream=new ByteArrayInputStream(bytes);
IoUtil.copy(inputStream, outputStream);
} catch (IOException ioException) {
ioException.printStackTrace();
IoUtil.copy(new ByteArrayInputStream(downloadBytes), outputStream);
updateStatus=true;
} catch (IOException ioe) {
logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
ioe.printStackTrace();
} catch (RuntimeException e) {
logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
return updateStatus;
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
} catch (RuntimeException e) {
logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
e.printStackTrace();
}
return fileBytes;
}
@Override
public void cancel() {
this.isRunning = false;

View File

@@ -1,9 +1,9 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -17,7 +17,8 @@ import java.util.concurrent.locks.Lock;
public class DistributedLock implements Lock, Watcher {
private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
// private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
private static final Log logger = LogFactory.get();
private ZooKeeper zk = null;
/**

View File

@@ -59,7 +59,7 @@ public class HdfsUtils {
public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException {
try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) {
fsDataOutputStream.write(bytes);
fsDataOutputStream.flush();
// fsDataOutputStream.flush();
} catch (RuntimeException e) {
logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage());
} catch (IOException e) {

View File

@@ -1,5 +1,7 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
@@ -18,8 +20,6 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
@@ -38,7 +38,8 @@ public class HttpClientUtils {
/** 全局连接池对象 */
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
// private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
private static final Log logger = LogFactory.get();
public static final String ERROR_MESSAGE = "-1";
/*

View File

@@ -1,11 +1,14 @@
package com.zdjizhi.utils;
import cn.hutool.core.io.file.FileReader;
import cn.hutool.crypto.digest.DigestUtil;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.CustomFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
@@ -86,20 +89,126 @@ public class IpUtils {
}
}
public static void updateIpLook(Map<String, byte[]> knowledgeFileCache){
public static void updateIpLook(Map<String, String> knowledgeFileCache){
try{
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
ipLookup= builder.loadDataFileV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_built_in.mmdb")))
.loadDataFileV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_built_in.mmdb")))
.loadDataFilePrivateV4(new ByteArrayInputStream(knowledgeFileCache.get("ip_v4_user_defined.mmdb")))
.loadDataFilePrivateV6(new ByteArrayInputStream(knowledgeFileCache.get("ip_v6_user_defined.mmdb")))
.build();
if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)) {
byte[] ipv4BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
}
}
}
byte[] ipv6BuiltBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
}
}
}
byte[] ipv4UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
}
}
}
byte[] ipv6UserBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
}
}
}
}else if ("SINGLE".equals(CommonConfig.CLUSTER_OR_SINGLE)){
byte[] ipv4BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_built_in.mmdb");
if (ipv4BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_built_in.mmdb")){
String sha256 = knowledgeFileCache.get("ip_v4_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)){
builder.loadDataFileV4(new ByteArrayInputStream(ipv4BuiltBytes));
}
}
}
byte[] ipv6BuiltBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_built_in.mmdb");
if (ipv6BuiltBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_built_in.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_built_in.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_built_in.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFileV6(new ByteArrayInputStream(ipv6BuiltBytes));
}
}
}
byte[] ipv4UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v4_user_defined.mmdb");
if (ipv4UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v4_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v4_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v4_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV4(new ByteArrayInputStream(ipv4UserBytes));
}
}
}
byte[] ipv6UserBytes = FileByteUtils.getFileBytes(CommonConfig.DOWNLOAD_PATH + "ip_v6_user_defined.mmdb");
if (ipv6UserBytes!=null){
if (knowledgeFileCache.containsKey("ip_v6_user_defined.mmdb")) {
String sha256 = knowledgeFileCache.get("ip_v6_user_defined.mmdb");
byte[] localFileByte = getLocalFile("ip_v6_user_defined.mmdb");
String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
if (sha256.equals(localFileSha256Hex)) {
builder.loadDataFilePrivateV6(new ByteArrayInputStream(ipv6UserBytes));
}
}
}
}
ipLookup = builder.build();
}catch (Exception e){
LOG.error("加载失败",e);
}
}
private static byte[] getLocalFile(String name) {
byte[] fileBytes = null;
try {
fileBytes = "CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE) ?
HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) :
new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
} catch (RuntimeException | IOException e) {
e.printStackTrace();
}
return fileBytes;
}
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("49.7.115.37"));

View File

@@ -1,11 +1,11 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
@@ -13,7 +13,8 @@ import java.util.Properties;
import java.util.concurrent.Executor;
public class NacosUtils {
private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
// private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
private static final Log logger = LogFactory.get();
private static Properties nacosProperties = new Properties();
private static Properties commonProperties = new Properties();

View File

@@ -1,11 +1,12 @@
package com.zdjizhi.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.CommonConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnowflakeId {
private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class);
// private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class);
private static final Log logger = LogFactory.get();
/**
* 共64位 第一位为符号位 默认0

View File

@@ -1,11 +1,11 @@
package com.zdjizhi.utils;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -13,7 +13,8 @@ import java.util.concurrent.CountDownLatch;
public class ZookeeperUtils implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
// private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
private static final Log logger = LogFactory.get();
private ZooKeeper zookeeper;

View File

@@ -5,7 +5,7 @@ stream.execution.environment.parallelism=1
stream.execution.job.name=DOS-DETECTION-APPLICATION
#输入kafka并行度大小
kafka.input.parallelism=1
kafka.input.parallelism=3
#输入kafka topic名
kafka.input.topic.name=DOS-SKETCH-RECORD
@@ -15,18 +15,18 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2112080949
kafka.input.group.id=dos-detection-job-221125-1
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
kafka.output.metric.parallelism=3
#发送kafka metrics topic名
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
kafka.output.event.parallelism=3
#发送kafka event topic名
#kafka.output.event.topic.name=DOS-EVENT
@@ -52,7 +52,7 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines
hbase.baseline.total.num=1000000
#baseline ttl单位
hbase.baseline.ttl=30
hbase.baseline.ttl=10
#设置聚合并行度2个key
flink.first.agg.parallelism=1
@@ -61,10 +61,10 @@ flink.first.agg.parallelism=1
flink.detection.map.parallelism=1
#watermark延迟
flink.watermark.max.orderness=10
flink.watermark.max.orderness=300
#计算窗口大小默认600s
flink.window.max.time=10
flink.window.max.time=600
#dos event结果中distinct source IP限制
source.ip.list.limit=10000
@@ -83,9 +83,6 @@ ip.mmdb.path=D:\\data\\dat\\bak\\
bifang.server.uri=http://192.168.44.72:80
#bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限tokenbifang内置无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
#加密密码路径信息
bifang.server.encryptpwd.path=/v1/user/encryptpwd
@@ -123,23 +120,11 @@ baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
#sasl.jaas.config.password=galaxy2019
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置10
sasl.jaas.config.flag=1
#nacos配置
#nacos.server.addr=192.168.44.12:8848
#nacos.namespace=public
#nacos.username=nacos
#nacos.password=nacos
#nacos.data.id=knowledge_base.json
#nacos.group=DEFAULT_GROUP
#nacos.read.timeout=5000
############################## Nacos 配置 ######################################
nacos.server.addr=192.168.44.12:8848
nacos.username=nacos
@@ -156,30 +141,6 @@ nacos.static.namespace=test
nacos.static.data.id=dos_detection.properties
nacos.static.group=Galaxy
############################## HTTP 配置 ######################################
#http请求相关参数
#最大连接数
#http.pool.max.connection=400
#
##单路由最大连接数
#http.pool.max.per.route=80
#
##向服务端请求超时时间设置(单位:毫秒)
#http.pool.request.timeout=60000
#
##向服务端连接超时时间设置(单位:毫秒)
#http.pool.connect.timeout=60000
#
##服务端响应超时时间设置(单位:毫秒)
#http.pool.response.timeout=60000
#server.uri=http://192.168.44.12:9098
#server.path=/hos/knowledge_base_hos_bucket
############################## hos Token 配置 ######################################
hos.token=c21f969b5f03d33d43e04f8f136e7682
@@ -189,8 +150,8 @@ cluster.or.single=CLUSTER
############################## 集群模式配置文件路径 配置 ######################################
hdfs.path=/test/TEST/
hdfs.uri.nn1=hdfs://192.168.40.151:9000
hdfs.uri.nn2=hdfs://192.168.40.152:9000
hdfs.uri.nn1=192.168.40.151:9000
hdfs.uri.nn2=192.168.40.152:9000
hdfs.user=dos
############################## 单机模式配置文件下载路径 配置 ######################################

View File

@@ -0,0 +1,23 @@
#Log4j
log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/flink-dos-detection.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277141, sketch_duration=59, attack_type='DNS Flood', source_ip='23.91.128.115', destination_ip='102.219.30.33', sketch_sessions=945, sketch_packets=945, sketch_bytes=446370, vsys_id=23}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277205, sketch_duration=86, attack_type='DNS Flood', source_ip='172.217.160.68', destination_ip='10.113.83.88', sketch_sessions=730, sketch_packets=730, sketch_bytes=344575, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277244, sketch_duration=47, attack_type='DNS Flood', source_ip='45.135.144.112', destination_ip='42.62.192.132', sketch_sessions=0, sketch_packets=0, sketch_bytes=47, vsys_id=1}
--DosDetectionThreshold
{profileId='6091', attackType='DNS Flood', serverIpList=[113.113.83.213, 42.62.192.132/28, 10.113.83.1/25, 102.219.30.33/29], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}
{profileId='5679', attackType='DNS Flood', serverIpList=[102.219.30.33], serverIpAddr='null', packetsPerSec=500, bitsPerSec=1000000, sessionsPerSec=100000, isValid=1, vsysId=23, superiorIds=[4, 5]}

View File

@@ -0,0 +1,6 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277232, sketch_duration=59, attack_type='ICMP Flood', source_ip='45.170.244.25', destination_ip='24.152.57.56', sketch_sessions=499, sketch_packets=499, sketch_bytes=111970, vsys_id=1}
--DosDetectionThreshold
{profileId='6093', attackType='ICMP Flood', serverIpList=[31.131.80.88/29, 24.152.57.56/29, 47.93.59.1/25], serverIpAddr='null', packetsPerSec=210, bitsPerSec=0, sessionsPerSec=0, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -0,0 +1,7 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1685003938, sketch_duration=63714, attack_type='TCP SYN Flood', source_ip='5.32.144.55', destination_ip='45.188.134.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=4195, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277234, sketch_duration=57, attack_type='TCP SYN Flood', source_ip='18.65.148.128', destination_ip='23.200.74.224', sketch_sessions=54, sketch_packets=54, sketch_bytes=73427, vsys_id=1}
--DosDetectionThreshold
{profileId='6095', attackType='TCP SYN Flood', serverIpList=[23.200.74.224, 45.188.134.11/29, 41.183.0.15/29, 41.183.0.16/30], serverIpAddr='null', packetsPerSec=1, bitsPerSec=1, sessionsPerSec=1, isValid=1, vsysId=1, superiorIds=[5, 4, 12, 27]}

View File

@@ -0,0 +1,8 @@
--DosSketchLog
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277291, sketch_duration=0, attack_type='UDP Flood', source_ip='121.14.89.209', destination_ip='192.168.50.11', sketch_sessions=0, sketch_packets=0, sketch_bytes=0, vsys_id=1}
{common_sled_ip='null', common_data_center='null', sketch_start_time=1686277233, sketch_duration=58, attack_type='UDP Flood', source_ip='192.168.50.56,192.168.50.34,192.168.50.11,192.168.50.33,192.168.50.55,192.168.50.58,192.168.50.36,192.168.50.14,192.168.50.35,192.168.50.13,192.168.50.57,192.168.50.30,192.168.50.51,192.168.50.54,192.168.50.10,192.168.50.32,192.168.50.53,192.168.50.31,192.168.50.16,192.168.50.38,192.168.50.15,192.168.50.37,192.168.50.18,192.168.50.17,192.168.50.50,192.168.50.45,192.168.50.23,192.168.50.22,192.168.50.44,192.168.50.25,192.168.50.47,192.168.50.46,192.168.50.24,192.168.50.63,192.168.50.41,192.168.50.40,192.168.50.62,192.168.50.43,192.168.50.21,192.168.50.20,192.168.50.42,192.168.50.27,192.168.50.26,192.168.50.48,192.168.50.28,192.168.50.61,192.168.50.60', destination_ip='121.14.89.209', sketch_sessions=297, sketch_packets=297, sketch_bytes=371404, vsys_id=1}
--DosDetectionThreshold
{profileId='5333', attackType='UDP Flood', serverIpList=[192.168.50.11, 192.168.50.12], serverIpAddr='null', packetsPerSec=50, bitsPerSec=50, sessionsPerSec=50, isValid=1, vsysId=1, superiorIds=[4, 12, 5, 27]}

View File

@@ -0,0 +1,237 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.NacosUtils;
import com.zdjizhi.utils.SnowflakeId;
import com.zdjizhi.utils.StringUtil;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.math.BigDecimal;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashSet;
public class DosDetectionTest {
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
@Test
public void dosDetectionTest(){
DosDetectionThreshold dosDetectionThreshold = new DosDetectionThreshold();
ArrayList<String> serverIpList = new ArrayList<>();
serverIpList.add("192.168.50.11");
serverIpList.add("192.168.50.1/24");
serverIpList.add("FC::12:0:0/54");
serverIpList.add("FC::12:0:0");
dosDetectionThreshold.setProfileId(4437);
dosDetectionThreshold.setAttackType("DNS Flood");
dosDetectionThreshold.setServerIpList(serverIpList);
dosDetectionThreshold.setSessionsPerSec(1);
dosDetectionThreshold.setPacketsPerSec(1);
dosDetectionThreshold.setBitsPerSec(100000);
dosDetectionThreshold.setIsValid(1);
dosDetectionThreshold.setSuperiorIds(new Integer[]{5,4,12,27});
DosSketchLog dosSketchLog = new DosSketchLog ();
dosSketchLog.setSketch_sessions(68);
dosSketchLog.setSketch_packets(68);
dosSketchLog.setSketch_bytes(285820);//185.82
dosSketchLog.setVsys_id(1);
dosSketchLog.setAttack_type("ICMP Flood");
dosSketchLog.setSource_ip("45.170.244.25");
dosSketchLog.setDestination_ip("24.152.57.56");
//静态阈值获取
long sessionBase = dosDetectionThreshold.getSessionsPerSec();
long pktBase=dosDetectionThreshold.getPacketsPerSec();
long bitBase=dosDetectionThreshold.getBitsPerSec();
//基于速率进行计算
long diffSession = dosSketchLog.getSketch_sessions() - sessionBase;
long diffPkt = dosSketchLog.getSketch_packets() - pktBase;
long diffByte = dosSketchLog.getSketch_bytes() - bitBase;
Double diffSessionPercent = getDiffPercent(diffSession, sessionBase)*100;
Double diffPktPercent = getDiffPercent(diffPkt, pktBase)*100;
Double diffBitPercent = getDiffPercent(diffByte, bitBase)*100;
long profileId = 0;
DosEventLog result =null;
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfileId();
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
System.out.println(result);
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
profileId = dosDetectionThreshold.getProfileId();
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
System.out.println(result);
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
profileId = dosDetectionThreshold.getProfileId();
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
System.out.println(result);
}
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, long profileId, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
Integer staticSensitivityThreshold = 100;
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < 0.2) {
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
}else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold){
// logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}",destinationIp, attackType, base, percent, value);
}else {
result = getResult(value, base, profileId, severity, percent+1, type, tag);
if (type == SENSITIVITY_CONDITION_TYPE){
result.setSeverity(Severity.MAJOR.severity);
}
// logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
}
// else {
// logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
// }
}
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
// dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setVsys_id(value.getVsys_id());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setProfile_id(profileId);
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.severity);
// dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
dosEventLog.setDestination_ip(value.getDestination_ip());
// dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
dosEventLog.setSource_ip_list(ipList);
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
dosEventLog.setSession_rate(value.getSketch_sessions());
dosEventLog.setPacket_rate(value.getSketch_packets());
dosEventLog.setBit_rate(value.getSketch_bytes());
return dosEventLog;
}
public String getConditions(double percent, long base, long sessions, int type, String tag,DosEventLog dosEventLog) {
int condition =0;
if ("Minor".equals(dosEventLog.getSeverity())){
condition=50;
}else if ("Warning".equals(dosEventLog.getSeverity())){
condition=100;
}else if ("Major".equals(dosEventLog.getSeverity())){
condition=250;
}else if ("Severe".equals(dosEventLog.getSeverity())){
condition=500;
}else if ("Critical".equals(dosEventLog.getSeverity())){
condition =800;
}
switch (type) {
case STATIC_CONDITION_TYPE:
return "Rate > " +
base + " " +
tag + "/s" + "(>"+condition+"%)";
case BASELINE_CONDITION_TYPE:
return tag + " > " +
PERCENT_INSTANCE.format(percent) + " of baseline";
case SENSITIVITY_CONDITION_TYPE:
return String.valueOf(sessions) + " " +
tag + "/s Unusually high " +
StringUtils.capitalize(tag);
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
}
private String getSourceCountryList(String sourceIpList) {
if (StringUtil.isNotBlank(sourceIpList)) {
String countryList;
try {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
// logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
return StringUtil.EMPTY;
}
} else {
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
}
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= 0.5 && diffPercent < 1) {
return Severity.MINOR;
} else if (diffPercent >= 1 && diffPercent < 2.5) {
return Severity.WARNING;
} else if (diffPercent >= 2.5 && diffPercent < 5) {
return Severity.MAJOR;
} else if (diffPercent >= 5 && diffPercent < 8) {
return Severity.SEVERE;
} else if (diffPercent >= 8) {
return Severity.CRITICAL;
} else {
return Severity.NORMAL;
}
}
private enum Severity {
/**
* 判断严重程度枚举类型
*/
CRITICAL("Critical"),
SEVERE("Severe"),
MAJOR("Major"),
WARNING("Warning"),
MINOR("Minor"),
NORMAL("Normal");
private final String severity;
@Override
public String toString() {
return this.severity;
}
Severity(String severity) {
this.severity = severity;
}
}
}

View File

@@ -0,0 +1,11 @@
package com.zdjizhi.etl;
import org.junit.Test;
public class EtlProcessFunctionTest {
@Test
public void EtlProcessFunctionTest(){
}
}