40 Commits

Author SHA1 Message Date
wanglihui
0a6f36393c DoS Detection Bifang Access Token 可支持动态配置 2022-07-19 17:20:09 +08:00
unknown
84a1e6879a 修复动态获取nacos配置 2022-06-28 11:15:49 +08:00
unknown
ab8f6aba81 修复动态获取nacos配置 2022-06-28 11:14:54 +08:00
unknown
94e8fb807a 集成nacos,修复依赖冲突,启动报错 2022-06-22 15:58:49 +08:00
wanglihui
cead1d4d99 Merge branch 'tsg-22.06' of git.mesalab.cn:bigdata/tsg/flink-dos-detection 2022-06-20 18:15:24 +08:00
徐鹏飞
2d98c3b6e6 修改获取静态阈值逻辑,根据默认指定vsysID获取 2022-06-13 18:00:57 +08:00
徐鹏飞
3dc29a07be 新增根据vsysId从bifang获取静态阈值 2022-05-31 18:00:36 +08:00
wanglihui
1fcdb79739 DoS事件日志对Conditions基于速率检测属性值修正 2022-05-17 13:54:43 +08:00
wanglihui
3d974217d9 集成nacos,将部分配置放到nacos中管理。 2022-04-08 10:16:29 +08:00
wanglihui
db17064f73 更新工具库到1.0.8适配新版MMDB定位库 2022-02-11 16:06:24 +08:00
wanglihui
065e5abb09 增加jasypt配置加密方式 2021-12-20 13:50:18 +08:00
wanglihui
75bbdd2962 更新DoS检测程序,新增读取baseline TTL配置。
修复DoS检测-Conditions阈值描述语言逻辑问题。
2021-12-08 13:51:09 +08:00
wanglihui
c46a395d9b 修改静态阈值,上下限判定等配置。 2021-11-26 09:46:53 +08:00
wanglihui
cc3f0cf620 解决获取配置为空时空指针异常bug。 2021-11-05 19:01:12 +08:00
wanglihui
0617b1e614 修改获取基线值策略,当基线type=3且基线值小于静态敏感阈值时,将其替换。 2021-10-29 18:43:05 +08:00
wanglihui
0125b031dd 代码格式化,实体类重写equals、hashcode方法。 2021-10-22 18:38:29 +08:00
wanglihui
177e7461cc 优化构建baseline方式 2021-10-21 18:27:48 +08:00
wanglihui
be916531fb 修改构建threshold RangeMap逻辑,基于attack type为key,避免IP冲突问题。 2021-10-20 18:23:12 +08:00
wanglihui
c692112445 增加基于packets与bits作为static条件判断依据。
修复static配置IP冲突问题。
2021-10-19 18:39:13 +08:00
wanglihui
b03ab9642d 修复静态条件与基线条件冲突bug 2021-09-28 16:11:52 +08:00
wanglihui
c44250bf73 新增读取DoS Detection Profiles IP冲突检测机制
修复DoS event日志end_time大于当前时间bug
2021-09-26 18:41:36 +08:00
wanglihui
77bc6a844e 修复读取配置IP冲突问题 2021-09-23 18:36:27 +08:00
wanglihui
e930fa23ed 修改判定逻辑,增加基线敏感阈值作为判定条件。 2021-09-16 18:47:00 +08:00
wanglihui
8cd4dea19e 获取静态阈值列表使用bifang默认token
修复获取baseline下标方法
2021-09-15 10:08:17 +08:00
wanglihui
62f3c65d66 基于DoS Sketch一元组进行实时检测 2021-09-14 18:46:23 +08:00
wanglihui
8cfb442c44 增加一元组作为基线生成数据源 2021-09-13 14:14:58 +08:00
wanglihui
4f8807dfa1 修改计算速率方式,使用session总数除以时间窗口 2021-09-13 09:46:02 +08:00
wanglihui
81f6499458 新增敏感阈值,过滤告警信息
修改计算平均值方式,先聚合再平均
2021-09-09 10:46:50 +08:00
wanglihui
b4237bb4a9 新增kafka sasl认证机制 2021-09-06 16:19:33 +08:00
wanglihui
c5943298bd 修复因double精度问题导致日志判定结果等级错误bug 2021-08-26 18:42:28 +08:00
wanglihui
b4f919647a 新增根据静态阈值判定dos攻击逻辑
新增定时器,定时获取静态阈值与baseline
2021-08-24 16:35:31 +08:00
wanglihui
55af33b508 新增解析静态阈值功能 2021-08-20 18:34:40 +08:00
wanglihui
28e7275674 新增rangeMap存储对应IP段配置信息 2021-08-20 11:52:20 +08:00
wanglihui
f744677021 新增读取bifang静态阈值配置接口
修改galaxy工具类库版本
2021-08-18 19:15:49 +08:00
wanglihui
c957f3ec1c 增加基线值为0时处理逻辑,将0替换为默认值。 2021-08-17 18:56:53 +08:00
wanglihui
9bda526d48 修复判定超出基线百分比逻辑bug 2021-08-17 11:09:45 +08:00
wanglihui
e89e1b08c9 修改处理逻辑,去掉处理机IP与数据中心作为key的判定条件。 2021-08-16 18:24:13 +08:00
wanglihui
e0de04886b 添加配置文件注释、删除过期配置文件 2021-08-11 18:38:53 +08:00
wanglihui
30a24683e3 metrics统计增加根据server IP hashcode分区数。 2021-08-09 18:28:52 +08:00
wanglihui
5190654a8f 修改处理逻辑为内存中缓存baseline数据 2021-08-05 18:42:34 +08:00
31 changed files with 1922 additions and 495 deletions

147
pom.xml
View File

@@ -102,6 +102,13 @@
<dependencies>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -116,61 +123,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.flink/flink-table &ndash;&gt;-->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-table</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--&lt;!&ndash;<scope>provided</scope>&ndash;&gt;-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--Flink modules-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
@@ -178,7 +131,7 @@
<!-- CLI dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
@@ -195,6 +148,28 @@
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
@@ -214,11 +189,10 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
@@ -227,10 +201,16 @@
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.4</version>
<version>1.0.8</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -246,6 +226,51 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>

View File

@@ -1,19 +1,27 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.CommonConfigurations;
import com.zdjizhi.utils.NacosUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* Created by wk on 2021/1/6.
* @author wlh
* @date 2021/1/6
*/
public class CommonConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = CommonConfigurations.getIntProperty("stream.execution.environment.parallelism");
public static final String STREAM_EXECUTION_JOB_NAME = CommonConfigurations.getStringProperty("stream.execution.job.name");
public static final int KAFKA_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.input.parallelism");
public static final String KAFKA_INPUT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.input.topic.name");
public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.input.bootstrap.servers");
public static final String KAFKA_SCAN_STARTUP_MODE = CommonConfigurations.getStringProperty("kafka.input.scan.startup.mode");
public static final String KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("kafka.input.group.id");
public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.metric.parallelism");
@@ -22,28 +30,67 @@ public class CommonConfig {
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
public static final int HBASE_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("hbase.input.parallelism");
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout");
public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period");
public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name");
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
public static final int HBASE_BASELINE_TTL = CommonConfigurations.getIntProperty("hbase.baseline.ttl");
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
public static final int FLINK_SECOND_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.second.agg.parallelism");
public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");
public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness");
public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time");
public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit");
public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num");
public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num");
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
//
// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path");
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout");
public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout");
public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
public static final String SASL_JAAS_CONFIG_PASSWORD = encryptor.decrypt(CommonConfigurations.getStringProperty("sasl.jaas.config.password"));
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
public static void main(String[] args) {
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值
encryptor.setPassword("galaxy");
// 对"raw_password"进行加密S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
// String password = "galaxy2019";
String password = "nacos";
String encPwd = encryptor.encrypt(password);
System.out.println(encPwd);
// 再进行解密raw_password
String rawPwd = encryptor.decrypt(encPwd);
System.out.println(rawPwd);
}
}

View File

@@ -0,0 +1,63 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Objects;
public class DosBaselineThreshold implements Serializable {
private ArrayList<Integer> session_rate;
private Integer session_rate_baseline_type;
private Integer session_rate_default_value;
public ArrayList<Integer> getSession_rate() {
return session_rate;
}
public void setSession_rate(ArrayList<Integer> session_rate) {
this.session_rate = session_rate;
}
public Integer getSession_rate_baseline_type() {
return session_rate_baseline_type;
}
public void setSession_rate_baseline_type(Integer session_rate_baseline_type) {
this.session_rate_baseline_type = session_rate_baseline_type;
}
public Integer getSession_rate_default_value() {
return session_rate_default_value;
}
public void setSession_rate_default_value(Integer session_rate_default_value) {
this.session_rate_default_value = session_rate_default_value;
}
@Override
public String toString() {
return "DosBaselineThreshold{" +
"session_rate=" + session_rate +
", session_rate_baseline_type=" + session_rate_baseline_type +
", session_rate_default_value=" + session_rate_default_value +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosBaselineThreshold)) {
return false;
}
DosBaselineThreshold that = (DosBaselineThreshold) o;
return Objects.equals(getSession_rate(), that.getSession_rate()) &&
Objects.equals(getSession_rate_baseline_type(), that.getSession_rate_baseline_type()) &&
Objects.equals(getSession_rate_default_value(), that.getSession_rate_default_value());
}
@Override
public int hashCode() {
return Objects.hash(getSession_rate(), getSession_rate_baseline_type(), getSession_rate_default_value());
}
}

View File

@@ -0,0 +1,121 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Objects;
/**
* @author wlh
*/
public class DosDetectionThreshold implements Serializable {
private String profileId;
private String attackType;
private ArrayList<String> serverIpList;
private String serverIpAddr;
private long packetsPerSec;
private long bitsPerSec;
private long sessionsPerSec;
private int isValid;
@Override
public String toString() {
return "DosDetectionThreshold{" +
"profileId='" + profileId + '\'' +
", attackType='" + attackType + '\'' +
", serverIpList=" + serverIpList +
", serverIpAddr='" + serverIpAddr + '\'' +
", packetsPerSec=" + packetsPerSec +
", bitsPerSec=" + bitsPerSec +
", sessionsPerSec=" + sessionsPerSec +
", isValid=" + isValid +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DosDetectionThreshold threshold = (DosDetectionThreshold) o;
return packetsPerSec == threshold.packetsPerSec &&
bitsPerSec == threshold.bitsPerSec &&
sessionsPerSec == threshold.sessionsPerSec &&
isValid == threshold.isValid &&
Objects.equals(profileId, threshold.profileId) &&
Objects.equals(attackType, threshold.attackType) &&
Objects.equals(serverIpList, threshold.serverIpList) &&
Objects.equals(serverIpAddr, threshold.serverIpAddr);
}
@Override
public int hashCode() {
return Objects.hash(profileId, attackType, serverIpList, serverIpAddr, packetsPerSec, bitsPerSec, sessionsPerSec, isValid);
}
public String getProfileId() {
return profileId;
}
public void setProfileId(String profileId) {
this.profileId = profileId;
}
public String getAttackType() {
return attackType;
}
public void setAttackType(String attackType) {
this.attackType = attackType;
}
public ArrayList<String> getServerIpList() {
return serverIpList;
}
public void setServerIpList(ArrayList<String> serverIpList) {
this.serverIpList = serverIpList;
}
public String getServerIpAddr() {
return serverIpAddr;
}
public void setServerIpAddr(String serverIpAddr) {
this.serverIpAddr = serverIpAddr;
}
public long getPacketsPerSec() {
return packetsPerSec;
}
public void setPacketsPerSec(long packetsPerSec) {
this.packetsPerSec = packetsPerSec;
}
public long getBitsPerSec() {
return bitsPerSec;
}
public void setBitsPerSec(long bitsPerSec) {
this.bitsPerSec = bitsPerSec;
}
public long getSessionsPerSec() {
return sessionsPerSec;
}
public void setSessionsPerSec(long sessionsPerSec) {
this.sessionsPerSec = sessionsPerSec;
}
public int getIsValid() {
return isValid;
}
public void setIsValid(int isValid) {
this.isValid = isValid;
}
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.Objects;
public class DosEventLog implements Serializable {
@@ -140,4 +141,33 @@ public class DosEventLog implements Serializable {
", bit_rate=" + bit_rate +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosEventLog)) {
return false;
}
DosEventLog that = (DosEventLog) o;
return getLog_id() == that.getLog_id() &&
getStart_time() == that.getStart_time() &&
getEnd_time() == that.getEnd_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getSeverity(), that.getSeverity()) &&
Objects.equals(getConditions(), that.getConditions()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip()) &&
Objects.equals(getDestination_country(), that.getDestination_country()) &&
Objects.equals(getSource_ip_list(), that.getSource_ip_list()) &&
Objects.equals(getSource_country_list(), that.getSource_country_list());
}
@Override
public int hashCode() {
return Objects.hash(getLog_id(), getStart_time(), getEnd_time(), getAttack_type(), getSeverity(), getConditions(), getDestination_ip(), getDestination_country(), getSource_ip_list(), getSource_country_list(), getSession_rate(), getPacket_rate(), getBit_rate());
}
}

View File

@@ -1,17 +1,25 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.Objects;
public class DosMetricsLog implements Serializable {
private long sketch_start_time;
private String common_sled_ip;
private String common_data_center;
private String attack_type;
private String destination_ip;
private long session_rate;
private long packet_rate;
private long bit_rate;
private int partition_num;
public int getPartition_num() {
return partition_num;
}
public void setPartition_num(int partition_num) {
this.partition_num = partition_num;
}
public long getSketch_start_time() {
return sketch_start_time;
@@ -21,22 +29,6 @@ public class DosMetricsLog implements Serializable {
this.sketch_start_time = sketch_start_time;
}
public String getCommon_sled_ip() {
return common_sled_ip;
}
public void setCommon_sled_ip(String common_sled_ip) {
this.common_sled_ip = common_sled_ip;
}
public String getCommon_data_center() {
return common_data_center;
}
public void setCommon_data_center(String common_data_center) {
this.common_data_center = common_data_center;
}
public String getAttack_type() {
return attack_type;
}
@@ -81,13 +73,35 @@ public class DosMetricsLog implements Serializable {
public String toString() {
return "DosMetricsLog{" +
"sketch_start_time=" + sketch_start_time +
", common_sled_ip='" + common_sled_ip + '\'' +
", common_data_center='" + common_data_center + '\'' +
", attack_type='" + attack_type + '\'' +
", destination_ip='" + destination_ip + '\'' +
", session_rate=" + session_rate +
", packet_rate=" + packet_rate +
", bit_rate=" + bit_rate +
", partition_num=" + partition_num +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosMetricsLog)) {
return false;
}
DosMetricsLog that = (DosMetricsLog) o;
return getSketch_start_time() == that.getSketch_start_time() &&
getSession_rate() == that.getSession_rate() &&
getPacket_rate() == that.getPacket_rate() &&
getBit_rate() == that.getBit_rate() &&
getPartition_num() == that.getPartition_num() &&
Objects.equals(getAttack_type(), that.getAttack_type()) &&
Objects.equals(getDestination_ip(), that.getDestination_ip());
}
@Override
public int hashCode() {
return Objects.hash(getSketch_start_time(), getAttack_type(), getDestination_ip(), getSession_rate(), getPacket_rate(), getBit_rate(), getPartition_num());
}
}

View File

@@ -1,6 +1,7 @@
package com.zdjizhi.common;
import java.io.Serializable;
import java.util.Objects;
public class DosSketchLog implements Serializable {
@@ -31,6 +32,32 @@ public class DosSketchLog implements Serializable {
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DosSketchLog)) {
return false;
}
DosSketchLog sketchLog = (DosSketchLog) o;
return getSketch_start_time() == sketchLog.getSketch_start_time() &&
getSketch_duration() == sketchLog.getSketch_duration() &&
getSketch_sessions() == sketchLog.getSketch_sessions() &&
getSketch_packets() == sketchLog.getSketch_packets() &&
getSketch_bytes() == sketchLog.getSketch_bytes() &&
Objects.equals(getCommon_sled_ip(), sketchLog.getCommon_sled_ip()) &&
Objects.equals(getCommon_data_center(), sketchLog.getCommon_data_center()) &&
Objects.equals(getAttack_type(), sketchLog.getAttack_type()) &&
Objects.equals(getSource_ip(), sketchLog.getSource_ip()) &&
Objects.equals(getDestination_ip(), sketchLog.getDestination_ip());
}
@Override
public int hashCode() {
return Objects.hash(getCommon_sled_ip(), getCommon_data_center(), getSketch_start_time(), getSketch_duration(), getAttack_type(), getSource_ip(), getDestination_ip(), getSketch_sessions(), getSketch_packets(), getSketch_bytes());
}
public String getCommon_sled_ip() {
return common_sled_ip;
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.common;
import java.util.Objects;
public class DosVsysId {
private int vsysId;
public int getVsysId() {
return vsysId;
}
public void setVsysId(int vsysId) {
this.vsysId = vsysId;
}
@Override
public String toString() {
return "DosVsysId{" +
"vsysId=" + vsysId +
'}';
}
}

View File

@@ -1,114 +1,158 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.sink.OutputStreamSink;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId;
import com.zdjizhi.common.*;
import com.zdjizhi.utils.*;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author wlh
* DoS检测判断逻辑
*/
public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<String, Map<String, List<Integer>>>, DosEventLog> {
public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private static final Logger logger = LoggerFactory.getLogger(DosDetection.class);
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap;
private final static int BASELINE_SIZE = 144;
private final static int STATIC_CONDITION_TYPE = 1;
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
Types.STRING,
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
private final static int OTHER_BASELINE_TYPE = 3;
@Override
public void open(Configuration parameters) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
try {
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0,
CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
} catch (Exception e) {
logger.error("定时器任务执行失败", e);
}
PERCENT_INSTANCE.setMinimumFractionDigits(2);
}
@Override
public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector<DosEventLog> out) throws Exception {
public DosEventLog map(DosSketchLog value) {
DosEventLog finalResult = null;
try {
Map<String, Map<String, List<Integer>>> broadcast = ctx.getBroadcastState(descriptor).get("broadcast-state");
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
logger.info("当前判断IP{}, 类型: {}",destinationIp,attackType);
if (broadcast.containsKey(destinationIp)){
List<Integer> baseline = broadcast.get(destinationIp).get(attackType);
if (baseline != null && baseline.size() == BASELINE_SIZE){
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
Integer base = baseline.get(timeIndex);
long sketchSessions = value.getSketch_sessions();
long diff = sketchSessions - base;
if (diff > 0){
String percent = getDiffPercent(diff, sketchSessions);
double diffPercentDouble = getDiffPercentDouble(percent);
Severity severity = judgeSeverity(diffPercentDouble);
if (severity != Severity.NORMAL){
DosEventLog result = getResult(value, severity, percent);
logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}",destinationIp,attackType,result.toString());
out.collect(result);
}else {
logger.info("当前server IP{} 未出现 {} 异常,日志详情 {}",destinationIp,attackType,value.toString());
}
}
}
}else {
logger.info("未获取到当前server IP{} 类型 {} baseline数据",destinationIp,attackType);
IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress();
DosDetectionThreshold threshold = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create()).get(destinationIpAddress);
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (threshold == null && baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogByBaseline(value);
} else if (threshold == null && !baselineMap.containsKey(destinationIp)) {
finalResult = getDosEventLogBySensitivityThreshold(value);
} else if (threshold != null) {
finalResult = getDosEventLogByStaticThreshold(value, threshold);
} else {
logger.debug("未获取到当前server IP{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType);
}
}catch (Exception e){
logger.error("判定失败\n {} \n{}",value,e);
} catch (Exception e) {
logger.error("判定失败\n {} \n{}", value, e);
}
return finalResult;
}
@Override
public void processBroadcastElement(Map<String, Map<String, List<Integer>>> value, Context ctx, Collector<DosEventLog> out) {
try {
ctx.getBroadcastState(descriptor).put("broadcast-state", value);
}catch (Exception e){
logger.error("更新广播状态失败 {}",e);
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
long diff = sketchSessions - NacosUtils.getIntProperty("static.sensitivity.threshold");
result = getDosEventLog(value, NacosUtils.getIntProperty("static.sensitivity.threshold"), diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
result.setSeverity(Severity.MAJOR.severity);
}
return result;
}
public static void main(String[] args) {
DosDetection dosDetection = new DosDetection();
// HashSet<String> strings = new HashSet<>();
// strings.add("13.46.241.36");
// strings.add("25.46.241.45");
// strings.add("133.46.241.53");
// strings.add("219.46.242.74");
// strings.add("153.146.241.196");
// strings.add("132.46.241.21");
// String join = StringUtils.join(strings, ",");
// System.out.println(IpUtils.ipLookup.countryLookup("192.168.50.150"));
System.out.println(Severity.CRITICAL.severity);
private DosEventLog getDosEventLogByBaseline(DosSketchLog value) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
long diff = sketchSessions - base;
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
return result;
}
private DosEventLog getResult(DosSketchLog value,Severity severity,String percent){
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
}
}
return result;
}
private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff, int type, String tag) {
DosEventLog result = null;
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent+1, type, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
} else {
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
}
}
return result;
}
private DosEventLog getResult(DosSketchLog value, long base, Severity severity, double percent, int type, String tag) {
DosEventLog dosEventLog = new DosEventLog();
dosEventLog.setLog_id(SnowflakeId.generateId());
dosEventLog.setStart_time(value.getSketch_start_time());
dosEventLog.setEnd_time(value.getSketch_start_time()+CommonConfig.FLINK_WINDOW_MAX_TIME);
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
dosEventLog.setAttack_type(value.getAttack_type());
dosEventLog.setSeverity(severity.toString());
dosEventLog.setConditions(getConditions(percent));
dosEventLog.setSeverity(severity.severity);
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag));
dosEventLog.setDestination_ip(value.getDestination_ip());
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
String ipList = value.getSource_ip();
@@ -120,47 +164,114 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
return dosEventLog;
}
private String getConditions(String percent){
return "sessions > "+percent+" of baseline";
}
private String getSourceCountryList(String sourceIpList){
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip:ipArr){
countrySet.add(IpUtils.ipLookup.countryLookup(ip));
private Integer getBaseValue(DosBaselineThreshold dosBaselineThreshold, DosSketchLog value) {
Integer base = 0;
try {
if (dosBaselineThreshold != null) {
ArrayList<Integer> baselines = dosBaselineThreshold.getSession_rate();
Integer defaultVaule = dosBaselineThreshold.getSession_rate_default_value();
Integer sessionRateBaselineType = dosBaselineThreshold.getSession_rate_baseline_type();
if (baselines != null && baselines.size() == BASELINE_SIZE) {
int timeIndex = getCurrentTimeIndex(value.getSketch_start_time());
base = baselines.get(timeIndex);
if (base == 0) {
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
base = defaultVaule;
}
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")){
base = NacosUtils.getIntProperty("static.sensitivity.threshold");
}
}
}
} catch (Exception e) {
logger.error("解析baseline数据失败,返回默认值0", e);
}
return StringUtils.join(countrySet,",");
return base;
}
private int getCurrentTimeIndex(long sketchStartTime){
long currentDayTime = sketchStartTime / (60 * 60 * 24) * 60 * 60 * 24;
long indexLong = (sketchStartTime - currentDayTime) / 600;
return Integer.parseInt(Long.toString(indexLong));
private String getConditions(String percent, long base, long sessions, int type, String tag) {
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
.append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
case BASELINE_CONDITION_TYPE:
return new StrBuilder()
.append(tag).append(" > ")
.append(percent).append(" of baseline")
.toString();
case SENSITIVITY_CONDITION_TYPE:
return new StrBuilder()
.append(sessions).append(" ")
.append(tag).append("/s Unusually high ")
.append(StringUtils.capitalize(tag))
.toString();
default:
throw new IllegalArgumentException("Illegal Argument type:" + type + ", known types = [1,2,3]");
}
}
private String getDiffPercent(long diff,long sketchSessions){
double diffDou = Double.parseDouble(Long.toString(diff));
double sessDou = Double.parseDouble(Long.toString(sketchSessions));
return PERCENT_INSTANCE.format(diffDou / sessDou);
private String getSourceCountryList(String sourceIpList) {
if (StringUtil.isNotBlank(sourceIpList)) {
String countryList;
try {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
return StringUtil.EMPTY;
}
} else {
throw new IllegalArgumentException("Illegal Argument sourceIpList = null");
}
}
private double getDiffPercentDouble(String diffPercent) throws ParseException {
return PERCENT_INSTANCE.parse(diffPercent).doubleValue();
private int getCurrentTimeIndex(long sketchStartTime) {
int index = 0;
try {
long currentDayTime = DateUtils.getTimeFloor(new Date(sketchStartTime * 1000L), "P1D").getTime() / 1000;
long indexLong = (sketchStartTime - currentDayTime) / (86400 / BASELINE_SIZE);
index = Integer.parseInt(Long.toString(indexLong));
} catch (Exception e) {
logger.error("获取time index失败", e);
}
return index;
}
private Severity judgeSeverity(double diffPercent){
if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD){
public static void main(String[] args) {
// System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," +
// "23.200.74.224,161.117.68.253"));
// DosDetection dosDetection = new DosDetection();
// System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000)));
}
private Double getDiffPercent(long diff, long base) {
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold")) {
return Severity.MINOR;
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD){
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.major.threshold")) {
return Severity.WARNING;
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD){
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.major.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold")) {
return Severity.MAJOR;
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD){
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
return Severity.SEVERE;
}else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD){
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
return Severity.CRITICAL;
}else {
} else {
return Severity.NORMAL;
}
}
@@ -188,4 +299,5 @@ public class DosDetection extends BroadcastProcessFunction<DosSketchLog, Map<Str
}
}
}

View File

@@ -3,7 +3,7 @@ package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -18,18 +18,21 @@ import static com.zdjizhi.sink.OutputStreamSink.outputTag;
/**
* @author 94976
*/
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple4<String,String,String,String>, TimeWindow> {
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple2<String,String>, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
@Override
public void process(Tuple4<String,String, String, String> keys,
public void process(Tuple2<String, String> keys,
Context context, Iterable<DosSketchLog> elements,
Collector<DosSketchLog> out) {
DosSketchLog middleResult = getMiddleResult(keys, elements);
try {
if (middleResult != null){
out.collect(middleResult);
logger.info("获取中间聚合结果:{}",middleResult.toString());
logger.debug("获取中间聚合结果:{}",middleResult.toString());
context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult));
}
}catch (Exception e){
@@ -37,16 +40,14 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
private DosSketchLog getMiddleResult(Tuple4<String,String, String, String> keys,Iterable<DosSketchLog> elements){
private DosSketchLog getMiddleResult(Tuple2<String, String> keys,Iterable<DosSketchLog> elements){
DosSketchLog midResuleLog = new DosSketchLog();
Tuple6<Long, Long, Long,String,Long,Long> values = sketchAggregate(elements);
try {
if (values != null){
midResuleLog.setCommon_sled_ip(keys.f0);
midResuleLog.setCommon_data_center(keys.f1);
midResuleLog.setDestination_ip(keys.f3);
midResuleLog.setAttack_type(keys.f2);
midResuleLog.setAttack_type(keys.f0);
midResuleLog.setDestination_ip(keys.f1);
midResuleLog.setSketch_start_time(values.f4);
midResuleLog.setSketch_duration(values.f5);
midResuleLog.setSource_ip(values.f3);
@@ -62,27 +63,32 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
private Tuple6<Long, Long, Long,String,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
int cnt = 1;
long sessions = 0;
long packets = 0 ;
long bytes = 0;
long startTime = 0;
long startTime = System.currentTimeMillis()/1000;
long endTime = System.currentTimeMillis()/1000;
long duration = 0;
HashSet<String> sourceIpSet = new HashSet<>();
try {
for (DosSketchLog newSketchLog : elements){
sessions += newSketchLog.getSketch_sessions();
packets += newSketchLog.getSketch_packets();
bytes += newSketchLog.getSketch_bytes();
startTime = newSketchLog.getSketch_start_time();
duration = newSketchLog.getSketch_duration();
cnt += 1;
if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
sourceIpSet.add(newSketchLog.getSource_ip());
String sourceIp = newSketchLog.getSource_ip();
if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
sessions += newSketchLog.getSketch_sessions();
packets += newSketchLog.getSketch_packets();
bytes += newSketchLog.getSketch_bytes();
startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
}else {
if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){
sourceIpSet.add(sourceIp);
}
}
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
return Tuple6.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList,startTime,duration);
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
}catch (Exception e){
logger.error("聚合中间结果集失败 {}",e);
}

View File

@@ -0,0 +1,109 @@
package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosBaselineThreshold;
import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.HbaseUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class ParseBaselineThreshold {
private static final Logger logger = LoggerFactory.getLogger(ParseBaselineThreshold.class);
private static ArrayList<String> floodTypeList = new ArrayList<>();
private static Table table = null;
private static Scan scan = null;
static {
floodTypeList.add("TCP SYN Flood");
floodTypeList.add("UDP Flood");
floodTypeList.add("ICMP Flood");
floodTypeList.add("DNS Flood");
}
private static void prepareHbaseEnv() throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
long currentTimeMillis = System.currentTimeMillis();
scan = new Scan()
.setAllowPartialResults(true)
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
.setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
static Map<String, Map<String, DosBaselineThreshold>> readFromHbase() {
Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
try {
prepareHbaseEnv();
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
for (String type:floodTypeList){
DosBaselineThreshold baselineThreshold = new DosBaselineThreshold();
ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate");
if (sessionRate != null && !sessionRate.isEmpty()){
Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value");
Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type");
baselineThreshold.setSession_rate(sessionRate);
baselineThreshold.setSession_rate_default_value(defaultValue);
baselineThreshold.setSession_rate_baseline_type(rateBaselineType);
floodTypeMap.put(type,baselineThreshold);
}
}
baselineMap.put(rowkey, floodTypeMap);
}
logger.info("格式化baseline数据成功读取IP共{}", baselineMap.size());
} catch (Exception e) {
logger.error("读取hbase数据失败", e);
}
return baselineMap;
}
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
long p200D = DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime();
System.out.println(p200D);
System.out.println(currentTimeMillis);
System.out.println(currentTimeMillis - p200D);
Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {
Map<String, DosBaselineThreshold> stringTuple2Map = baselineMap.get(key);
Set<String> strings = stringTuple2Map.keySet();
for (String s:strings){
DosBaselineThreshold dosBaselineThreshold = stringTuple2Map.get(s);
System.out.println(key+"---"+s+"---"+dosBaselineThreshold);
}
}
System.out.println(baselineMap.size());
}
}

View File

@@ -1,5 +1,6 @@
package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.source.DosSketchSource;
@@ -17,9 +18,16 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
/**
* @author wlh
*/
public class ParseSketchLog {
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class);
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource(){
return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
@@ -37,20 +45,16 @@ public class ParseSketchLog {
private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
@Override
public void flatMap(String s, Collector<DosSketchLog> collector) throws Exception {
public void flatMap(String s, Collector<DosSketchLog> collector) {
try {
if (StringUtil.isNotBlank(s)){
HashMap<String, Object> sketchSource = (HashMap<String, Object>) JsonMapper.fromJsonString(s, Object.class);
String commonSledIp = sketchSource.get("common_sled_ip").toString();
String commonDataCenter = sketchSource.get("common_data_center").toString();
HashMap<String, Object> sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType);
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
String attackType = sketchSource.get("attack_type").toString();
ArrayList<HashMap<String, Object>> reportIpList = (ArrayList<HashMap<String, Object>>) sketchSource.get("report_ip_list");
ArrayList<HashMap<String, Object>> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType);
for (HashMap<String, Object> obj : reportIpList) {
DosSketchLog dosSketchLog = new DosSketchLog();
dosSketchLog.setCommon_sled_ip(commonSledIp);
dosSketchLog.setCommon_data_center(commonDataCenter);
dosSketchLog.setSketch_start_time(sketchStartTime);
dosSketchLog.setSketch_duration(sketchDuration);
dosSketchLog.setAttack_type(attackType);
@@ -61,11 +65,11 @@ public class ParseSketchLog {
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
dosSketchLog.setSource_ip(sourceIp);
dosSketchLog.setDestination_ip(destinationIp);
dosSketchLog.setSketch_sessions(sketchSessions/sketchDuration);
dosSketchLog.setSketch_packets(sketchPackets/sketchDuration);
dosSketchLog.setSketch_bytes(sketchBytes*8/sketchDuration);
dosSketchLog.setSketch_sessions(sketchSessions);
dosSketchLog.setSketch_packets(sketchPackets);
dosSketchLog.setSketch_bytes(sketchBytes);
collector.collect(dosSketchLog);
logger.info("数据解析成功:{}",dosSketchLog.toString());
logger.debug("数据解析成功:{}",dosSketchLog.toString());
}
}
} catch (Exception e) {

View File

@@ -0,0 +1,275 @@
package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.NacosUtils;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* @author wlh
*/
public class ParseStaticThreshold {
private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
private static String encryptpwd;
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
encryptpwd = getEncryptpwd();
}
/**
* 获取加密密码
*/
private static String getEncryptpwd() {
String psw = HttpClientUtils.ERROR_MESSAGE;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("password", "admin");
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
psw = data.get("encryptpwd").toString();
} else {
logger.error(msg);
}
}
} catch (URISyntaxException e) {
logger.error("构造URI异常", e);
} catch (Exception e) {
logger.error("获取encryptpwd失败", e);
}
return psw;
}
/**
* 登录bifang服务获取token
*
* @return token
*/
private static String loginBifangServer() {
String token = HttpClientUtils.ERROR_MESSAGE;
try {
if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("username", "admin");
parms.put("password", encryptpwd);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms);
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
token = data.get("token").toString();
} else {
logger.error(msg);
}
}
}
} catch (Exception e) {
logger.error("登录失败,未获取到token ", e);
}
return token;
}
/**
* 获取vsysId配置列表
*
* @return vsysIdList
*/
private static ArrayList<DosVsysId> getVsysId() {
ArrayList<DosVsysId> vsysIdList = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "vsysId desc");
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId{}条", vsysIdList.size());
} else {
logger.warn("vsysIdList为空");
}
} else {
logger.error(msg);
}
}
}
} catch (Exception e) {
logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e);
}
return vsysIdList;
}
/**
* 根据vsysId获取静态阈值配置列表
*
* @return thresholds
*/
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
// ArrayList<DosVsysId> vsysId = getVsysId();
try {
// if (vsysId != null){
// for (DosVsysId dosVsysId : vsysId) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
// parms.put("vsysId", dosVsysId.getVsysId());
parms.put("vsysId", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = NacosUtils.getStringProperty("bifang.server.token");
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
logger.info("获取到静态阈值配置{}条", thresholds.size());
} else {
logger.warn("静态阈值配置为空");
}
} else {
logger.error(msg);
}
}
}
// }
// }
} catch (Exception e) {
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
return thresholds;
}
/**
* 基于静态阈值构建threshold RangeMapk:IP段或具体IPv:配置信息
*
* @return threshold RangeMap
*/
static HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> createStaticThreshold() {
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> thresholdRangeMap = new HashMap<>(4);
try {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) {
for (DosDetectionThreshold threshold : dosDetectionThreshold) {
String attackType = threshold.getAttackType();
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = thresholdRangeMap.getOrDefault(attackType, TreeRangeMap.create());
ArrayList<String> serverIpList = threshold.getServerIpList();
for (String sip : serverIpList) {
IPAddressString ipAddressString = new IPAddressString(sip);
if (ipAddressString.isIPAddress()) {
IPAddress address = ipAddressString.getAddress();
if (address.isPrefixed()) {
IPAddress lower = address.getLower();
IPAddress upper = address.getUpper();
if (!address.isMultiple()) {
lower = address.adjustPrefixLength(address.getBitCount());
upper = address.toMaxHost().withoutPrefixLength();
}
Map.Entry<Range<IPAddress>, DosDetectionThreshold> lowerEntry = treeRangeMap.getEntry(lower);
Map.Entry<Range<IPAddress>, DosDetectionThreshold> upperEntry = treeRangeMap.getEntry(upper);
if (lowerEntry != null && upperEntry == null) {
Range<IPAddress> lowerEntryKey = lowerEntry.getKey();
DosDetectionThreshold lowerEntryValue = lowerEntry.getValue();
treeRangeMap.put(Range.closedOpen(lowerEntryKey.lowerEndpoint(), lower), lowerEntryValue);
treeRangeMap.put(Range.closed(lower, upper), threshold);
} else if (lowerEntry == null && upperEntry != null) {
Range<IPAddress> upperEntryKey = upperEntry.getKey();
DosDetectionThreshold upperEntryValue = upperEntry.getValue();
treeRangeMap.put(Range.openClosed(upper, upperEntryKey.upperEndpoint()), upperEntryValue);
treeRangeMap.put(Range.closed(lower, upper), threshold);
} else {
treeRangeMap.put(Range.closed(lower, upper), threshold);
}
} else {
treeRangeMap.put(Range.closed(address, address), threshold);
}
}
}
thresholdRangeMap.put(attackType, treeRangeMap);
}
}
} catch (Exception e) {
logger.error("构建threshold RangeMap失败", e);
}
return thresholdRangeMap;
}
public static void main(String[] args) {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
System.out.println("------------------------");
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> staticThreshold = createStaticThreshold();
System.out.println("------------------------");
for (String type : staticThreshold.keySet()) {
Map<Range<IPAddress>, DosDetectionThreshold> asMapOfRanges = staticThreshold.get(type).asMapOfRanges();
for (Range<IPAddress> range : asMapOfRanges.keySet()) {
DosDetectionThreshold threshold = asMapOfRanges.get(range);
System.out.println(type + "---" + range + "---" + threshold);
}
System.out.println("------------------------");
}
// String s = loginBifangServer();
// System.out.println(s);
}
}

View File

@@ -14,14 +14,13 @@ class TrafficServerIpMetrics {
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
DosMetricsLog dosMetricsLog = new DosMetricsLog();
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000));
dosMetricsLog.setCommon_sled_ip(midResuleLog.getCommon_sled_ip());
dosMetricsLog.setCommon_data_center(midResuleLog.getCommon_data_center());
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
logger.info("metric 结果已加载:{}",dosMetricsLog.toString());
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
return dosMetricsLog;
}
@@ -29,4 +28,13 @@ class TrafficServerIpMetrics {
return sketchStartTime / CommonConfig.FLINK_WINDOW_MAX_TIME * CommonConfig.FLINK_WINDOW_MAX_TIME;
}
private static int getPartitionNumByIp(String destinationIp){
return Math.abs(destinationIp.hashCode()) % CommonConfig.DESTINATION_IP_PARTITION_NUM;
}
public static void main(String[] args) {
System.out.println(getPartitionNumByIp("146.177.223.43"));
System.out.println("146.177.223.43".hashCode());
}
}

View File

@@ -2,6 +2,10 @@ package com.zdjizhi.main;
import com.zdjizhi.sink.OutputStreamSink;
/**
* @author wlh
* 程序主类入口
*/
public class DosDetectionApplication {
public static void main(String[] args) {

View File

@@ -6,10 +6,15 @@ import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.Objects;
class DosEventSink {
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
dosEventLogOutputStream.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
dosEventLogOutputStream
.filter(Objects::nonNull)
.map(JsonMapper::toJsonString)
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
}

View File

@@ -4,19 +4,12 @@ import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosEventLog;
import com.zdjizhi.common.DosMetricsLog;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.etl.EtlProcessFunction;
import com.zdjizhi.etl.DosDetection;
import com.zdjizhi.etl.EtlProcessFunction;
import com.zdjizhi.etl.ParseSketchLog;
import com.zdjizhi.source.BaselineSource;
import com.zdjizhi.utils.FlinkEnvironmentUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -24,8 +17,6 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* @author 94976
*/
@@ -34,15 +25,10 @@ public class OutputStreamSink {
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
private static MapStateDescriptor<String, Map<String, Map<String, List<Integer>>>> descriptor = new MapStateDescriptor<>("boradcast-state",
Types.STRING,
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class<List<Integer>>) (Class<?>) List.class).getTypeClass()));
public static void finalOutputSink(){
try {
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream = getOutputSinkStream(middleStream);
DosEventSink.dosEventOutputSink(dosEventLogOutputStream);
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
} catch (Exception e) {
@@ -50,89 +36,25 @@ public class OutputStreamSink {
}
}
public static void main(String[] args) throws Exception {
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream = getOutputSinkStream(middleStream);
DosEventSink.dosEventOutputSink(dosEventLogOutputStream);
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
dosEventLogOutputStream.print();
FlinkEnvironmentUtils.streamExeEnv.execute();
}
private static SingleOutputStreamOperator<DosEventLog> getOutputSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
BroadcastStream<Map<String, Map<String,List<Integer>>>> broadcast = FlinkEnvironmentUtils.streamExeEnv
.addSource(new BaselineSource())
.setParallelism(CommonConfig.HBASE_INPUT_PARALLELISM)
.broadcast(descriptor);
logger.info("广播变量加载成功!!");
return middleStream.keyBy(new SecondKeySelector())
// .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
.reduce(new SecondReduceFunc())
.connect(broadcast)
.process(new DosDetection())
.setParallelism(CommonConfig.FLINK_SECOND_AGG_PARALLELISM);
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
}
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
return ParseSketchLog.getSketchSource()
.keyBy(new FirstKeySelector())
.keyBy(new KeysSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME)))
.process(new EtlProcessFunction())
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
}
private static String groupUniqSourceIp(String sourceIp1,String sourceIp2){
HashSet<String> sourceIpSet = new HashSet<>();
Collections.addAll(sourceIpSet, (sourceIp1 + "," + sourceIp2).split(","));
if (sourceIpSet.size() > CommonConfig.SOURCE_IP_LIST_LIMIT){
return StringUtils.join(takeUniqLimit(sourceIpSet,CommonConfig.SOURCE_IP_LIST_LIMIT),",");
}
return StringUtils.join(sourceIpSet,",");
}
private static<T> Collection<T> takeUniqLimit(Collection<T> collection, int limit){
int i =0;
Collection<T> newSet = new HashSet<>();
for (T t:collection){
if (i < limit){
newSet.add(t);
i += 1;
}
}
return newSet;
}
private static class FirstKeySelector implements KeySelector<DosSketchLog, Tuple4<String, String, String, String>>{
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
@Override
public Tuple4<String, String, String, String> getKey(DosSketchLog dosSketchLog) throws Exception {
return Tuple4.of(
dosSketchLog.getCommon_sled_ip(),
dosSketchLog.getCommon_data_center(),
dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip());
}
}
private static class SecondKeySelector implements KeySelector<DosSketchLog, Tuple2<String, String>> {
@Override
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog) throws Exception {
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
return Tuple2.of(
dosSketchLog.getAttack_type(),
dosSketchLog.getDestination_ip());
}
}
private static class SecondReduceFunc implements ReduceFunction<DosSketchLog> {
@Override
public DosSketchLog reduce(DosSketchLog value1, DosSketchLog value2) throws Exception {
value1.setSketch_sessions((value1.getSketch_sessions()+value2.getSketch_sessions())/2);
value1.setSketch_bytes((value1.getSketch_bytes()+value2.getSketch_bytes())/2);
value1.setSketch_packets((value1.getSketch_packets()+value2.getSketch_packets())/2);
value1.setSource_ip(groupUniqSourceIp(value1.getSource_ip(),value2.getSource_ip()));
return value1;
}
}
}

View File

@@ -1,128 +0,0 @@
package com.zdjizhi.source;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wlh
*/
public class BaselineSource extends RichSourceFunction<Map<String, Map<String,List<Integer>>>> {
private static final Logger logger = LoggerFactory.getLogger(BaselineSource.class);
private Connection conn = null;
private Table table = null;
private Scan scan = null;
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
config.set("hbase.client.retries.number", "3");
config.set("hbase.bulkload.retries.number", "3");
config.set("zookeeper.recovery.retry", "3");
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, CommonConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CommonConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
// .addFamily(Bytes.toBytes(CommonConfig.HBASE_BASELINE_FAMLIY_NAME));
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void run(SourceContext<Map<String, Map<String,List<Integer>>>> sourceContext) throws Exception {
logger.info("开始读取baseline数据");
ResultScanner rs = table.getScanner(scan);
// Map<String, List<Integer>[]> baselineMap = new HashMap<>();
Map<String, Map<String,List<Integer>>> baselineMap = new HashMap<>();
for (Result result : rs) {
Map<String, List<Integer>> floodTypeMap = new HashMap<>();
String rowkey = Bytes.toString(result.getRow());
ArrayList<Integer> tcp = getArraylist(result,"TCP SYN Flood", "session_num");
ArrayList<Integer> udp = getArraylist(result,"UDP Flood", "session_num");
ArrayList<Integer> icmp = getArraylist(result,"ICMP Flood", "session_num");
ArrayList<Integer> dns = getArraylist(result,"DNS Amplification", "session_num");
floodTypeMap.put("TCP SYN Flood",tcp);
floodTypeMap.put("UDP Flood",udp);
floodTypeMap.put("ICMP Flood",icmp);
floodTypeMap.put("DNS Amplification",dns);
// List[] arr = new ArrayList[]{tcp,udp,icmp,dns};
baselineMap.put(rowkey,floodTypeMap);
}
sourceContext.collect(baselineMap);
logger.info("格式化baseline数据成功读取IP共{}",baselineMap.size());
}
private static ArrayList<Integer> getArraylist(Result result,String family,String qualifier) throws IOException {
if (!result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier))){
return null;
}
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
}
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = writable.get();
ArrayList<Integer> list = new ArrayList<>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
@Override
public void cancel() {
try {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
DataStreamSource<Map<String, Map<String,List<Integer>>>> mapDataStreamSource = env.addSource(new BaselineSource());
DataStream<Map<String, Map<String,List<Integer>>>> broadcast = mapDataStreamSource.broadcast();
mapDataStreamSource.print();
env.execute();
}
}

View File

@@ -9,6 +9,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* @author wlh
*/
public class DosSketchSource {
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
@@ -17,6 +20,11 @@ public class DosSketchSource {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return streamExeEnv.addSource(new FlinkKafkaConsumer<String>(
CommonConfig.KAFKA_INPUT_TOPIC_NAME,

View File

@@ -0,0 +1,23 @@
package com.zdjizhi.utils;
import java.util.Collection;
import java.util.HashSet;
/**
* @author wlh
* 扩展集合处理工具
*/
public class CollectionUtils {
public static<T> Collection<T> takeUniqueLimit(Collection<T> collection, int limit){
int i =0;
Collection<T> newSet = new HashSet<>();
for (T t:collection){
if (i < limit){
newSet.add(t);
i += 1;
}
}
return newSet;
}
}

View File

@@ -1,9 +1,9 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@@ -12,17 +12,35 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
public static StreamTableEnvironment getStreamTableEnv() {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
/*
// 每 1000ms 开始一次 checkpoint
streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
return StreamTableEnvironment.create(streamExeEnv, settings);
// 设置模式为精确一次 (这是默认值)
streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints这样 checkpoint 在作业取消后仍就会被保留
streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
*/
}
}

View File

@@ -1,5 +1,49 @@
package com.zdjizhi.utils;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.*;
/**
* @author wlh
*/
public class HbaseUtils {
public static Integer getIntegerValue(Result result, String family, String qualifier) {
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
if (value != null){
return Bytes.toInt(value);
}
return 1;
}
public static ArrayList<Integer> getArraylist(Result result, String family, String qualifier) throws IOException {
if (containsColumn(result, family, qualifier)) {
ArrayWritable w = new ArrayWritable(IntWritable.class);
w.readFields(new DataInputStream(new ByteArrayInputStream(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)))));
return fromWritable(w);
}
return null;
}
private static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = writable.get();
ArrayList<Integer> list = new ArrayList<>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable) wrt).get());
}
return list;
}
private static boolean containsColumn(Result result, String family, String qualifier) {
return result.containsColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}
}

View File

@@ -0,0 +1,269 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.http.*;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* http client工具类
* @author wlh
*/
public class HttpClientUtils {
/** 全局连接池对象 */
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
public static final String ERROR_MESSAGE = "-1";
/*
* 静态代码块配置连接池信息
*/
static {
// 设置最大连接数
CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
// 设置每个连接的路由数
CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
}
/**
* 获取Http客户端连接对象
* @return Http客户端连接对象
*/
private static CloseableHttpClient getHttpClient() {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
.setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
// 请求超时时间
.setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
// 响应超时时间
.setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
.build();
/*
* 测出超时重试机制为了防止超时不生效而设置
* 如果直接放回false,不重试
* 这里会根据情况进行判断是否重试
*/
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
if (executionCount >= 3) {// 如果已经重试了3次就放弃
return false;
}
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
return true;
}
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
return false;
}
if (exception instanceof UnknownHostException) {// 目标服务器不可达
return false;
}
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
return false;
}
if (exception instanceof HttpHostConnectException) {// 连接被拒绝
return false;
}
if (exception instanceof SSLException) {// ssl握手异常
return false;
}
if (exception instanceof InterruptedIOException) {// 超时
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
// 如果请求是幂等的,就再次尝试
return !(request instanceof HttpEntityEnclosingRequest);
};
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && "timeout".equalsIgnoreCase(param)) {
return Long.parseLong(value) * 1000;
}
}
return 60 * 1000;//如果没有约定则默认定义时长为60s
};
// 创建httpClient
return HttpClients.custom()
// 把请求相关的超时信息设置到连接客户端
.setDefaultRequestConfig(requestConfig)
// 把请求重试设置到连接客户端
.setRetryHandler(retry)
.setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
.setConnectionManager(CONN_MANAGER)
.build();
}
/**
* GET请求
*
* @param uri 请求地
* @return message
*/
public static String httpGet(URI uri, Header... headers) {
String msg = ERROR_MESSAGE;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient();
CloseableHttpResponse response = null;
try {
logger.info("http get uri {}",uri);
// 创建GET请求对象
HttpGet httpGet = new HttpGet(uri);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
logger.info("request header : {}",h);
}
}
// 执行请求
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
// 获取响应实体
HttpEntity entity = response.getEntity();
// 获取响应信息
msg = EntityUtils.toString(entity, "UTF-8");
if (statusCode != HttpStatus.SC_OK) {
logger.error("Http get content is :{}" , msg);
}
} catch (ClientProtocolException e) {
logger.error("协议错误: {}", e.getMessage());
} catch (ParseException e) {
logger.error("解析错误: {}", e.getMessage());
} catch (IOException e) {
logger.error("IO错误: {}",e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
logger.error("释放链接错误: {}", e.getMessage());
}
}
}
return msg;
}
/**
* POST 请求
* @param uri uri参数
* @param requestBody 请求体
* @return post请求返回结果
*/
public static String httpPost(URI uri, String requestBody, Header... headers) {
String msg = ERROR_MESSAGE;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient();
// 创建POST请求对象
CloseableHttpResponse response = null;
try {
logger.info("http post uri:{} http post body:{}", uri, requestBody);
HttpPost httpPost = new HttpPost(uri);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpPost.addHeader(h);
logger.info("request header : {}",h);
}
}
if(StringUtil.isNotBlank(requestBody)) {
byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
httpPost.setEntity(new ByteArrayEntity(bytes));
}
response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
// 获取响应实体
HttpEntity entity = response.getEntity();
// 获取响应信息
msg = EntityUtils.toString(entity, "UTF-8");
if (statusCode != HttpStatus.SC_OK) {
logger.error("Http post content is :{}" , msg);
}
} catch (ClientProtocolException e) {
logger.error("协议错误: {}", e.getMessage());
} catch (ParseException e) {
logger.error("解析错误: {}", e.getMessage());
} catch (IOException e) {
logger.error("IO错误: {}", e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consumeQuietly(response.getEntity());
response.close();
} catch (IOException e) {
logger.error("释放链接错误: {}", e.getMessage());
}
}
}
return msg;
}
/**
* 拼装url
* url ,参数map
*/
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) {
try {
uriBuilder.setPath(path);
if (params != null && !params.isEmpty()){
for (Map.Entry<String, Object> kv : params.entrySet()) {
uriBuilder.setParameter(kv.getKey(),kv.getValue().toString());
}
}
} catch (Exception e) {
logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
}
}
}

View File

@@ -7,15 +7,21 @@ public class IpUtils {
/**
* IP定位库工具类
*/
public static IpLookup ipLookup = new IpLookup.Builder(false)
// .loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb")
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb")
.build();
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("61.128.159.186"));
System.out.println(ipLookup.countryLookup("49.7.115.37"));
String ips = "182.168.50.23,182.168.50.45,182.168.56.9,182.168.56.8,92.168.50.58,19.168.56.7,12.168.56.6,2.168.50.40,1.168.50.19,9.168.50.6,2.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
for (String ip:ips.split(",")){
System.out.println(ip+"--"+ipLookup.countryLookup(ip));
}
}

View File

@@ -4,23 +4,32 @@ import com.zdjizhi.common.CommonConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Optional;
import java.util.Properties;
public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties propertiesproducer = new Properties();
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
if (CommonConfig.SASL_JAAS_CONFIG_FLAG == 1){
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
}
return propertiesproducer;
return properties;
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
return new FlinkKafkaProducer<String>(
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty()
getKafkaSinkProperty(),
Optional.empty()
);
kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer;
}
}

View File

@@ -0,0 +1,90 @@
package com.zdjizhi.utils;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
public class NacosUtils {
private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
private static Properties nacosProperties = new Properties();
private static Properties commonProperties = new Properties();
private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout");
static {
createConfigService();
}
private static void getProperties() {
nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD);
}
private static void createConfigService() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(nacosProperties);
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
commonProperties.load(new StringReader(config));
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
commonProperties.clear();
commonProperties.load(new StringReader(configMsg));
} catch (IOException e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
} catch (Exception e) {
e.printStackTrace();
logger.error("获取nacos配置失败", e);
}
}
public static String getStringProperty(String key) {
return commonProperties.getProperty(key);
}
public static Integer getIntProperty(String key) {
return Integer.parseInt(commonProperties.getProperty(key));
}
public static Double getDoubleProperty(String key) {
return Double.parseDouble(commonProperties.getProperty(key));
}
public static Long getLongProperty(String key) {
return Long.parseLong(commonProperties.getProperty(key));
}
public static Boolean getBooleanProperty(String key) {
return "true".equals(commonProperties.getProperty(key).toLowerCase().trim());
}
}

View File

@@ -1,41 +1,138 @@
#flink运行环境并行度其优先级低于算子并行度如果未设置算子并行度则使用该数值
stream.execution.environment.parallelism=1
stream.execution.job.name=dos-detection-job
#flink任务名一般不变
stream.execution.job.name=DOS-DETECTION-APPLICATION
#输入kafka并行度大小
kafka.input.parallelism=1
kafka.input.topic.name=DOS-SKETCH-LOG
kafka.input.bootstrap.servers=192.168.44.12:9092
kafka.input.scan.startup.mode=latest-offset
kafka.input.group.id=2108041426
#kafka.input.group.id=test
#输入kafka topic名
kafka.input.topic.name=DOS-SKETCH-RECORD
#输入kafka地址
#kafka.input.bootstrap.servers=192.168.44.12:9094
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2112080949
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
kafka.output.event.parallelism=1
kafka.output.event.topic.name=DOS-EVENT-LOG
kafka.output.bootstrap.servers=192.168.44.12:9092
hbase.input.parallelism=1
#发送kafka metrics topic名
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
#kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
#发送kafka event topic名
kafka.output.event.topic.name=DOS-EVENT
#kafka.output.event.topic.name=storm-dos-test
#kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
hbase.zookeeper.quorum=192.168.44.12:2181
#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间
hbase.client.operation.timeout=30000
hbase.client.scanner.timeout.period=30000
hbase.baseline.table.name=ddos_traffic_baselines
##hbase baseline表名
hbase.baseline.table.name=dos:ddos_traffic_baselines
#读取baseline限制
hbase.baseline.total.num=1000000
flink.first.agg.parallelism=1
flink.second.agg.parallelism=1
flink.watermark.max.orderness=1
flink.window.max.time=600
#baseline ttl单位
hbase.baseline.ttl=30
#设置聚合并行度2个key
flink.first.agg.parallelism=1
#设置结果判定并行度
flink.detection.map.parallelism=1
#watermark延迟
flink.watermark.max.orderness=10
#计算窗口大小默认600s
flink.window.max.time=10
#dos event结果中distinct source IP限制
source.ip.list.limit=10000
#基于目的IP的分区数默认为10000一般不变
destination.ip.partition.num=10000
data.center.id.num=15
ip.mmdb.path=D:\\data\\dat_test\\
#IP mmdb库路径
ip.mmdb.path=D:\\data\\dat\\
#ip.mmdb.path=/home/bigdata/topology/dat/
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
baseline.sessions.minor.threshold=0.1
baseline.sessions.warning.threshold=0.5
baseline.sessions.major.threshold=1
baseline.sessions.severe.threshold=3
baseline.sessions.critical.threshold=8
#bifang服务访问地址
bifang.server.uri=http://192.168.44.72:80
#bifang.server.uri=http://192.168.44.3:80
#访问bifang只读权限tokenbifang内置无需修改
bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867
#加密密码路径信息
bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
#获取vaysId路径信息
bifang.server.policy.vaysid.path=/v1/system/vsys/
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
#http请求相关参数
#最大连接数
http.pool.max.connection=400
#单路由最大连接数
http.pool.max.per.route=80
#向服务端请求超时时间设置(单位:毫秒)
http.pool.request.timeout=60000
#向服务端连接超时时间设置(单位:毫秒)
http.pool.connect.timeout=60000
#服务端响应超时时间设置(单位:毫秒)
http.pool.response.timeout=60000
#获取静态阈值周期,默认十分钟
static.threshold.schedule.minutes=10
#获取baseline周期默认7天
baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
#sasl.jaas.config.password=galaxy2019
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置10
sasl.jaas.config.flag=1
#nacos配置
nacos.server.addr=192.168.44.12:8848
nacos.namespace=test
nacos.username=nacos
nacos.password=nacos
nacos.data.id=dos_detection.properties
nacos.group=Galaxy
nacos.read.timeout=5000

View File

@@ -0,0 +1,7 @@
package com.zdjizhi.common;
public class HttpTest {
public static void main(String[] args) throws Exception {
}
}

View File

@@ -0,0 +1,106 @@
package com.zdjizhi.common;
import inet.ipaddr.Address;
import inet.ipaddr.AddressStringException;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import inet.ipaddr.format.util.AddressTrieMap;
import inet.ipaddr.format.util.AssociativeAddressTrie;
import inet.ipaddr.ipv4.IPv4Address;
import inet.ipaddr.ipv4.IPv4AddressAssociativeTrie;
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class IpTest {
public static void main(String[] args) throws Exception {
IPv4AddressAssociativeTrie<Integer> trie = new IPv4AddressAssociativeTrie<>();
IPAddress str1 = new IPAddressString("1.2.3.4").getAddress();
IPAddress str2 = new IPAddressString("10.0.0.0/15").getAddress();
IPAddress str3 = new IPAddressString("25.4.2.0/23").getAddress();
IPAddress str4 = new IPAddressString("192.168.8.0/21").getAddress();
IPAddress str5 = new IPAddressString("240.0.0.0/4").getAddress();
IPAddress str6 = new IPAddressString("fc00::0/64").getAddress();
IPAddress str7 = new IPAddressString("fc00::10:1").getAddress();
TreeRangeMap<IPAddress, Object> rangeMap = TreeRangeMap.create();
rangeMap.put(Range.closed(str1.getLower(),str1.getUpper()),1);
rangeMap.put(Range.closed(str2.getLower(),str2.getUpper()),2);
rangeMap.put(Range.closed(str3.getLower(),str3.getUpper()),3);
rangeMap.put(Range.closed(str4.getLower(),str4.getUpper()),4);
rangeMap.put(Range.closed(str5.getLower(),str5.getUpper()),5);
rangeMap.put(Range.closed(str6.getLower(),str6.getUpper()),6);
rangeMap.put(Range.closed(str7.getLower(),str7.getUpper()),7);
IPAddress pv4 = new IPAddressString("255.255.14.255").getAddress();
IPAddress pv42 = new IPAddressString("1.2.3.4").getAddress();
IPAddress pv43 = new IPAddressString("fc00::").getAddress();
IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress();
IPAddress pv45 = new IPAddressString("192.168.42.1").getAddress();
IPAddress pv46 = new IPAddressString("192.168.42.1/32").getAddress();
IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress();
IPAddress mask = pv45.getNetwork().getNetworkMask(24, false);
System.out.println(pv45.isMultiple());
System.out.println(pv46.isMultiple());
System.out.println(pv46.isPrefixed());
System.out.println(pv47.isPrefixed());
System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount()));
System.out.println(pv45+"---mask:"+pv45.mask(mask).toString());
System.out.println(pv45.adjustPrefixLength(pv45.getBitCount())+"---"+pv45.toMaxHost().withoutPrefixLength());
/*
System.out.println(str5.getUpper()+"---"+str5.getLower());
System.out.println(rangeMap.span().contains(pv4));
System.out.println(rangeMap.get(pv4));
System.out.println(rangeMap.get(pv42));
System.out.println(rangeMap.get(pv43));
System.out.println(rangeMap.get(pv44));
*/
/*
System.out.println(str5.toSequentialRange());
// System.out.println(str2.contains(new IPAddressString("10.0.0.2")));
// System.out.println(str5.toAddress().toIPv4().toSequentialRange());
trie.put(str1,1);
trie.put(str2,2);
trie.put(str3,3);
trie.put(str4,4);
trie.put(str5,5);
AddressTrieMap<IPv4Address, Integer> trieMap = new AddressTrieMap<>(trie);
trieMap.forEach((k,v) -> {
System.out.println(k.toString() + "--" + v);
});
System.out.println("-----------------");
trie.forEach((k) -> System.out.println(k.toString()));
System.out.println(str5.contains(pv4));
System.out.println(trie.contains(pv4));
System.out.println(trieMap.get(pv4));
System.out.println(trieMap.containsKey(pv4));
// System.out.println(trieMap.getRange());
// IPAddress str3 = new IPAddressString("fc00::10:1").getAddress();
// IPAddress str4 = new IPAddressString("fc00::10:2/64").getAddress();
// System.out.println(Arrays.toString(str1.mergeToPrefixBlocks(str2,str3,str4)));
*/
}
}

View File

@@ -0,0 +1,101 @@
package com.zdjizhi.common;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.Test;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2022/3/1016:58
*/
public class NacosTest {
/**
* <dependency>
* <groupId>com.alibaba.nacos</groupId>
* <artifactId>nacos-client</artifactId>
* <version>1.2.0</version>
* </dependency>
*/
private static Properties properties = new Properties();
/**
* config data id = config name
*/
private static final String DATA_ID = "dos_baseline.properties";
/**
* config group
*/
private static final String GROUP = "Galaxy";
private void getProperties() {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
}
@Test
public void GetConfigurationTest() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(DATA_ID, GROUP, 5000);
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void ListenerConfigurationTest() {
getProperties();
try {
//first get config
ConfigService configService = NacosFactory.createConfigService(properties);
String config = configService.getConfig(DATA_ID, GROUP, 5000);
// System.out.println(config);
//start listenner
configService.addListener(DATA_ID, GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
System.out.println(configMsg);
}
});
} catch (Exception e) {
e.printStackTrace();
}
//keep running,change nacos config,print new config
/*
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
*/
}
}

View File

@@ -1,17 +0,0 @@
package com.zdjizhi.common;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class UdtfTest extends TableFunction<Row> {
public void eval(Row[] rows) {
for (Row row : rows) {
collect(row);
}
}
public static void main(String[] args) {
}
}