Compare commits
6 Commits
feature/ts
...
feature/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea09d5dfb1 | ||
|
|
fae16e5804 | ||
|
|
046b7fea80 | ||
|
|
62bffa9731 | ||
|
|
77ec061844 | ||
|
|
a17666abff |
15
pom.xml
15
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>flink-dos-detection</artifactId>
|
<artifactId>flink-dos-detection</artifactId>
|
||||||
<version>23.12</version>
|
<version>1.0</version>
|
||||||
|
|
||||||
<name>flink-dos-detection</name>
|
<name>flink-dos-detection</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
@@ -56,14 +56,15 @@
|
|||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
<version>3.8.0</version>
|
<version>3.8.0</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
<source>1.8</source>
|
<source>11</source>
|
||||||
<target>1.8</target>
|
<target>11</target>
|
||||||
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
|
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
|
||||||
<useIncrementalCompilation>false</useIncrementalCompilation>
|
<useIncrementalCompilation>false</useIncrementalCompilation>
|
||||||
<compilerArgs>
|
<compilerArgs>
|
||||||
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
|
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
|
||||||
<arg>-Xpkginfo:always</arg>
|
<arg>-Xpkginfo:always</arg>
|
||||||
</compilerArgs>
|
</compilerArgs>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
@@ -80,6 +81,12 @@
|
|||||||
|
|
||||||
<configuration>
|
<configuration>
|
||||||
<finalName>flink-dos-detection</finalName>
|
<finalName>flink-dos-detection</finalName>
|
||||||
|
<relocations>
|
||||||
|
<relocation>
|
||||||
|
<pattern>org.apache.http</pattern>
|
||||||
|
<shadedPattern>shade.org.apache.http</shadedPattern>
|
||||||
|
</relocation>
|
||||||
|
</relocations>
|
||||||
<filters>
|
<filters>
|
||||||
<filter>
|
<filter>
|
||||||
<!-- Do not copy the signatures in the META-INF folder.
|
<!-- Do not copy the signatures in the META-INF folder.
|
||||||
@@ -207,7 +214,7 @@
|
|||||||
<groupId>org.apache.hbase</groupId>
|
<groupId>org.apache.hbase</groupId>
|
||||||
<artifactId>hbase-client</artifactId>
|
<artifactId>hbase-client</artifactId>
|
||||||
<version>2.2.3</version>
|
<version>2.2.3</version>
|
||||||
<scope>provided</scope>
|
<!-- <scope>provided</scope>-->
|
||||||
<exclusions>
|
<exclusions>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
|||||||
@@ -3,29 +3,28 @@ package com.zdjizhi.common;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author wlh
|
* @author wlh
|
||||||
*/
|
*/
|
||||||
public class DosDetectionThreshold implements Serializable {
|
public class DosDetectionThreshold implements Serializable {
|
||||||
private long profile_id;
|
private long id;
|
||||||
private String attack_type;
|
private String attack_type;
|
||||||
private ArrayList<String> server_ip_list;
|
private ArrayList<String> server_ip_list;
|
||||||
private String server_ip_addr;
|
private String server_ip_addr;
|
||||||
private long packets_per_sec;
|
private long packets_per_sec;
|
||||||
private long bits_per_sec;
|
private long bits_per_sec;
|
||||||
private long sessions_per_sec;
|
private long sessions_per_sec;
|
||||||
private int is_valid;
|
private int is_enabled;
|
||||||
private int vsys_id;
|
private int vsys_id;
|
||||||
private Integer[] superior_ids;
|
private Integer[] superior_ids;
|
||||||
|
|
||||||
public long getProfile_id() {
|
public long getId() {
|
||||||
return profile_id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProfile_id(long profile_id) {
|
public void setId(long id) {
|
||||||
this.profile_id = profile_id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAttack_type() {
|
public String getAttack_type() {
|
||||||
@@ -76,12 +75,12 @@ public class DosDetectionThreshold implements Serializable {
|
|||||||
this.sessions_per_sec = sessions_per_sec;
|
this.sessions_per_sec = sessions_per_sec;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getIs_valid() {
|
public int getIs_enabled() {
|
||||||
return is_valid;
|
return is_enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setIs_valid(int is_valid) {
|
public void setIs_enabled(int is_enabled) {
|
||||||
this.is_valid = is_valid;
|
this.is_enabled = is_enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getVsys_id() {
|
public int getVsys_id() {
|
||||||
@@ -103,14 +102,14 @@ public class DosDetectionThreshold implements Serializable {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DosDetectionThreshold{" +
|
return "DosDetectionThreshold{" +
|
||||||
"profile_id=" + profile_id +
|
"id=" + id +
|
||||||
", attack_type='" + attack_type + '\'' +
|
", attack_type='" + attack_type + '\'' +
|
||||||
", server_ip_list=" + server_ip_list +
|
", server_ip_list=" + server_ip_list +
|
||||||
", server_ip_addr='" + server_ip_addr + '\'' +
|
", server_ip_addr='" + server_ip_addr + '\'' +
|
||||||
", packets_per_sec=" + packets_per_sec +
|
", packets_per_sec=" + packets_per_sec +
|
||||||
", bits_per_sec=" + bits_per_sec +
|
", bits_per_sec=" + bits_per_sec +
|
||||||
", sessions_per_sec=" + sessions_per_sec +
|
", sessions_per_sec=" + sessions_per_sec +
|
||||||
", is_valid=" + is_valid +
|
", is_enabled=" + is_enabled +
|
||||||
", vsys_id=" + vsys_id +
|
", vsys_id=" + vsys_id +
|
||||||
", superior_ids=" + Arrays.toString(superior_ids) +
|
", superior_ids=" + Arrays.toString(superior_ids) +
|
||||||
'}';
|
'}';
|
||||||
|
|||||||
@@ -1,91 +0,0 @@
|
|||||||
package com.zdjizhi.common;
|
|
||||||
|
|
||||||
import com.zdjizhi.utils.CommonConfigurations;
|
|
||||||
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author wlh
|
|
||||||
* @date 2021/1/6
|
|
||||||
*/
|
|
||||||
public class FlowWriteConfig {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定位库默认分隔符
|
|
||||||
*/
|
|
||||||
|
|
||||||
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
|
|
||||||
|
|
||||||
static {
|
|
||||||
encryptor.setPassword("galaxy");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = CommonConfigurations.getIntProperty("stream.execution.environment.parallelism");
|
|
||||||
public static final String STREAM_EXECUTION_JOB_NAME = CommonConfigurations.getStringProperty("stream.execution.job.name");
|
|
||||||
|
|
||||||
public static final int KAFKA_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.input.parallelism");
|
|
||||||
public static final String KAFKA_INPUT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.input.topic.name");
|
|
||||||
public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.input.bootstrap.servers");
|
|
||||||
public static final String KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("kafka.input.group.id");
|
|
||||||
|
|
||||||
public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.metric.parallelism");
|
|
||||||
public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name");
|
|
||||||
public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism");
|
|
||||||
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
|
|
||||||
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
|
|
||||||
|
|
||||||
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
|
|
||||||
public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout");
|
|
||||||
public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period");
|
|
||||||
|
|
||||||
public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name");
|
|
||||||
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
|
|
||||||
public static final int HBASE_BASELINE_TTL = CommonConfigurations.getIntProperty("hbase.baseline.ttl");
|
|
||||||
|
|
||||||
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
|
|
||||||
public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");
|
|
||||||
public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness");
|
|
||||||
public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time");
|
|
||||||
|
|
||||||
public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit");
|
|
||||||
public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num");
|
|
||||||
public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num");
|
|
||||||
|
|
||||||
|
|
||||||
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
|
|
||||||
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
|
|
||||||
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
|
|
||||||
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
|
|
||||||
|
|
||||||
public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path");
|
|
||||||
|
|
||||||
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
|
|
||||||
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
|
|
||||||
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
|
|
||||||
public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout");
|
|
||||||
public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout");
|
|
||||||
|
|
||||||
public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
|
|
||||||
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
|
|
||||||
|
|
||||||
public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
|
|
||||||
public static final String SASL_JAAS_CONFIG_PASSWORD = encryptor.decrypt(CommonConfigurations.getStringProperty("sasl.jaas.config.password"));
|
|
||||||
|
|
||||||
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
|
|
||||||
|
|
||||||
public static final String NACOS_SERVER = CommonConfigurations.getStringProperty("nacos.server.addr");
|
|
||||||
public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
|
|
||||||
public static final String NACOS_PIN = CommonConfigurations.getStringProperty("nacos.password");
|
|
||||||
public static final String NACOS_PUBLIC_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
|
|
||||||
public static final String NACOS_KNOWLEDGEBASE_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
|
|
||||||
public static final String NACOS_PUBLIC_GROUP = CommonConfigurations.getStringProperty("nacos.group");
|
|
||||||
public static final Integer NACOS_CONNECTION_TIMEOUT = CommonConfigurations.getIntProperty("nacos.connection.timeout");
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static final String NACOS_DOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.dos.namespace");
|
|
||||||
public static final String NACOS_DOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.dos.data.id");
|
|
||||||
public static final String NACOS_DOS_GROUP = CommonConfigurations.getStringProperty("nacos.dos.group");
|
|
||||||
|
|
||||||
public static final Integer HTTP_SOCKET_TIMEOUT = CommonConfigurations.getIntProperty("http.socket.timeout");
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -6,26 +6,26 @@ import java.io.Serializable;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class KnowlegeBaseMeta implements Serializable {
|
public class KnowlegeBaseMeta implements Serializable {
|
||||||
private String id;
|
private String kb_id;
|
||||||
private String name;
|
private String name;
|
||||||
private String sha256;
|
private String sha256;
|
||||||
private String format;
|
private String format;
|
||||||
private String path;
|
private String path;
|
||||||
|
|
||||||
public KnowlegeBaseMeta(String id, String name, String sha256, String format, String path) {
|
public KnowlegeBaseMeta(String kd_id, String name, String sha256, String format, String path) {
|
||||||
this.id = id;
|
this.kb_id = kd_id;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.sha256 = sha256;
|
this.sha256 = sha256;
|
||||||
this.format = format;
|
this.format = format;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getId() {
|
public String getKb_id() {
|
||||||
return id;
|
return kb_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setId(String id) {
|
public void setKb_id(String kb_id) {
|
||||||
this.id = id;
|
this.kb_id = kb_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
@@ -63,7 +63,7 @@ public class KnowlegeBaseMeta implements Serializable {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "KnowlegeBaseMeta{" +
|
return "KnowlegeBaseMeta{" +
|
||||||
"id='" + id + '\'' +
|
"kb_id='" + kb_id + '\'' +
|
||||||
", name='" + name + '\'' +
|
", name='" + name + '\'' +
|
||||||
", sha256='" + sha256 + '\'' +
|
", sha256='" + sha256 + '\'' +
|
||||||
", format='" + format + '\'' +
|
", format='" + format + '\'' +
|
||||||
|
|||||||
264
src/main/java/com/zdjizhi/conf/DosConfigs.java
Normal file
264
src/main/java/com/zdjizhi/conf/DosConfigs.java
Normal file
@@ -0,0 +1,264 @@
|
|||||||
|
package com.zdjizhi.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.ConfigOption;
|
||||||
|
import org.apache.flink.configuration.ConfigOptions;
|
||||||
|
|
||||||
|
public class DosConfigs {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The prefix for Kafka properties used in the source.
|
||||||
|
*/
|
||||||
|
public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The prefix for Kafka properties used in the sink.
|
||||||
|
*/
|
||||||
|
public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option for the Kafka topic used in the source.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
|
||||||
|
ConfigOptions.key("source.kafka.topic")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
public static final ConfigOption<Long> FLINK_WINDOW_MAX_TIME =
|
||||||
|
ConfigOptions.key("flink.window.max.time")
|
||||||
|
.longType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
public static final ConfigOption<Long> FLINK_WATERMARK_MAX_ORDERNESS =
|
||||||
|
ConfigOptions.key("flink.watermark.max.orderness")
|
||||||
|
.longType()
|
||||||
|
.noDefaultValue();
|
||||||
|
/**
|
||||||
|
* Configuration option for the Kafka topic used in the sink.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> KAFKA_SINK_EVENT_TOPIC =
|
||||||
|
ConfigOptions.key("kafka.sink.event.topic.name")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
public static final ConfigOption<String> KAFKA_SINK_METRIC_TOPIC =
|
||||||
|
ConfigOptions.key("kafka.sink.metric.topic")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<String> HBASE_ZOOKEEPER_QUORUM =
|
||||||
|
ConfigOptions.key("hbase.zookeeper.quorum")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_URI =
|
||||||
|
ConfigOptions.key("bifang.server.uri")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
public static final ConfigOption<String> KNOWLEDGE_BASE_URL =
|
||||||
|
ConfigOptions.key("knowledge.base.uri")
|
||||||
|
.stringType()
|
||||||
|
.noDefaultValue();
|
||||||
|
|
||||||
|
|
||||||
|
//==============================The following variables have default values=====================================
|
||||||
|
/**
|
||||||
|
* Configuration option for the source parallelism used in the source.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<Integer> SOURCE_PARALLELISM =
|
||||||
|
ConfigOptions.key("source.parallelism")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> Flink_FIRST_AGG_PATALLELISM =
|
||||||
|
ConfigOptions.key("flink.first.agg.parallelism")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> FLINK_DETECTION_MAP_PARALLELISM =
|
||||||
|
ConfigOptions.key("flink.detection.map.parallelism")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> KAFKA_SINK_EVENT_PARALLELISM =
|
||||||
|
ConfigOptions.key("kafka.sink.event.parallelism")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> KAFKA_SINK_METRIC_PARALLELISM =
|
||||||
|
ConfigOptions.key("kafka.sink.metric.parallelism")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<String> IP_BUILTIN_KD_ID =
|
||||||
|
ConfigOptions.key("ip.builtin.kd.id")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("64af7077-eb9b-4b8f-80cf-2ceebc89bea9");
|
||||||
|
|
||||||
|
public static final ConfigOption<String> IP_USER_DEFINED_KD_ID =
|
||||||
|
ConfigOptions.key("ip.user.defined.kd.id")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("004390bc-3135-4a6f-a492-3662ecb9e289");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HTTP_SOCKET_TIMEOUT =
|
||||||
|
ConfigOptions.key("http.socket.timeout")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(90000);
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<String> KNOWLEDGE_BASE_PATH =
|
||||||
|
ConfigOptions.key("knowledge.base.path")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("/v1/knowledge_base");
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> STATIC_THRESHOLD_SCHEDULE_MINUTES =
|
||||||
|
ConfigOptions.key("static.threshold.schedule.minutes")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(10);
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> BASELINE_THRESHOLD_SCHEDULE_DAYS =
|
||||||
|
ConfigOptions.key("baseline.threshold.schedule.days")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(7);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> STATIC_SENSITIVITY_THRESHOLD =
|
||||||
|
ConfigOptions.key("static.sensitivity.threshold")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SENSITIVITY_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sensitivity.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(0.2);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SESSIONS_MINOR_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sessions.minor.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(0.2);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SESSIONS_WARNING_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sessions.warning.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(1.0);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SESSIONS_MAJOR_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sessions.major.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(2.5);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SESSIONS_SEVERE_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sessions.severe.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(5.0);
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> BASELINE_SESSIONS_CRITICAL_THRESHOLD =
|
||||||
|
ConfigOptions.key("baseline.sessions.critical.threshold")
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(8.0);
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_ENCRYPTPWD_PATH =
|
||||||
|
ConfigOptions.key("bifang.server.encryptpwd.path")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("/v1/user/encryptpwd");
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_POLICY_VSYSID_PATH =
|
||||||
|
ConfigOptions.key("bifang.server.policy.vaysid.path")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("/v1/admin/vsys");
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_TOKEN =
|
||||||
|
ConfigOptions.key("bifang.server.token")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("aa2bdec5518ad131f71944b13ce5c298&1&");
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_POLICY_THRESHOLD_PATH =
|
||||||
|
ConfigOptions.key("bifang.server.policy.threshold.path")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("/v1/policy/profile/dos_detection");
|
||||||
|
|
||||||
|
public static final ConfigOption<String> BIFANG_SERVER_LOGIN_PATH =
|
||||||
|
ConfigOptions.key("bifang.server.login.path")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("/v1/user/login");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HTTP_POOL_MAX_CONNECTION =
|
||||||
|
ConfigOptions.key("http.pool.max.connection")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(400);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HTTP_POOL_MAX_PER_ROUTE =
|
||||||
|
ConfigOptions.key("http.pool.max.per.route")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(80);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HTTP_POOL_REQUEST_TIMEOUT =
|
||||||
|
ConfigOptions.key("http.pool.request.timeout")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(60000);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HTTP_POOL_CONNECT_TIMEOUT =
|
||||||
|
ConfigOptions.key("http.pool.connect.timeout")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(60000);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> DATA_CENTER_ID_NUM =
|
||||||
|
ConfigOptions.key("data.center.id.num")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(15);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HBASE_CLIENT_OPERATION_TIMEOUT =
|
||||||
|
ConfigOptions.key("hbase.client.operation.timeout")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(30000);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD =
|
||||||
|
ConfigOptions.key("hbase.client.scanner.timeout.period")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(30000);
|
||||||
|
|
||||||
|
public static final ConfigOption<String> HBASE_BASELINE_TABLE_NAME =
|
||||||
|
ConfigOptions.key("hbase.baseline.table.name")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("dos:ddos_traffic_baselines");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HBASE_BASELINE_TTL =
|
||||||
|
ConfigOptions.key("hbase.baseline.ttl")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(10);
|
||||||
|
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> HBASE_BASELINE_TOTAL_NUM =
|
||||||
|
ConfigOptions.key("hbase.baseline.total.num")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(1000000);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> DESTINATION_IP_PARTITION_NUM =
|
||||||
|
ConfigOptions.key("destination.ip.partition.num")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(10000);
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> SOURCE_IP_LIST_LIMIT =
|
||||||
|
ConfigOptions.key("source.ip.list.limit")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(10000);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Knowledge base scheduling cycle, in minutes
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<Long> KNOWLEDGE_BASE_SCHEDULE_MINUTES =
|
||||||
|
ConfigOptions.key("knowledge.base.schedule.minutes")
|
||||||
|
.longType()
|
||||||
|
.defaultValue(60L);
|
||||||
|
|
||||||
|
public static final ConfigOption<String> JOB_NAME =
|
||||||
|
ConfigOptions.key("job.name")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("detection_dos_attack")
|
||||||
|
.withDescription("The flink job name.");
|
||||||
|
|
||||||
|
}
|
||||||
36
src/main/java/com/zdjizhi/conf/DosConfiguration.java
Normal file
36
src/main/java/com/zdjizhi/conf/DosConfiguration.java
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package com.zdjizhi.conf;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class DosConfiguration {
|
||||||
|
private final Configuration config;
|
||||||
|
|
||||||
|
public DosConfiguration(final Configuration config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves properties from the underlying `Configuration` instance that start with the specified
|
||||||
|
* `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
|
||||||
|
*
|
||||||
|
* @param prefix The prefix to filter properties.
|
||||||
|
* @return A `java.util.Properties` object containing the properties with the specified prefix.
|
||||||
|
*/
|
||||||
|
public Properties getProperties(final String prefix) {
|
||||||
|
if (prefix == null) {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
props.putAll(config.toMap());
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
return config.toMap()
|
||||||
|
.entrySet()
|
||||||
|
.stream()
|
||||||
|
.filter(entry -> entry.getKey().startsWith(prefix))
|
||||||
|
.collect(Properties::new, (props, e) ->
|
||||||
|
props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
|
||||||
|
Properties::putAll);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,109 +0,0 @@
|
|||||||
package com.zdjizhi.etl;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
|
||||||
import org.apache.flink.api.java.tuple.Tuple7;
|
|
||||||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
|
||||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
|
||||||
import org.apache.flink.util.Collector;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
|
|
||||||
import static com.zdjizhi.sink.OutputStreamSink.outputTag;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author 94976
|
|
||||||
*/
|
|
||||||
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String,String,Integer>, TimeWindow> {
|
|
||||||
|
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
|
|
||||||
private static final String EMPTY_SOURCE_IP_IPV6 = "::";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(Tuple3<String,String,Integer> keys,
|
|
||||||
Context context, Iterable<DosSketchLog> elements,
|
|
||||||
Collector<DosSketchLog> out) {
|
|
||||||
DosSketchLog middleResult = getMiddleResult(keys, elements);
|
|
||||||
try {
|
|
||||||
if (middleResult != null){
|
|
||||||
out.collect(middleResult);
|
|
||||||
logger.debug("获取中间聚合结果:{}",middleResult.toString());
|
|
||||||
context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult));
|
|
||||||
}
|
|
||||||
}catch (Exception e){
|
|
||||||
logger.error("获取中间聚合结果失败,middleResult: {}\n{}",middleResult.toString(),e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private DosSketchLog getMiddleResult(Tuple3<String,String,Integer> keys,Iterable<DosSketchLog> elements){
|
|
||||||
|
|
||||||
DosSketchLog midResuleLog = new DosSketchLog();
|
|
||||||
Tuple7<Long, Long, Long,String,Long,Long,Long> values = sketchAggregate(elements);
|
|
||||||
try {
|
|
||||||
if (values != null){
|
|
||||||
midResuleLog.setAttack_type(keys.f0);
|
|
||||||
midResuleLog.setDestination_ip(keys.f1);
|
|
||||||
midResuleLog.setVsys_id(keys.f2);
|
|
||||||
midResuleLog.setSketch_start_time(values.f4);
|
|
||||||
midResuleLog.setSketch_duration(values.f5);
|
|
||||||
midResuleLog.setSource_ip(values.f3);
|
|
||||||
midResuleLog.setSketch_sessions(values.f0);
|
|
||||||
midResuleLog.setSketch_packets(values.f1);
|
|
||||||
midResuleLog.setSketch_bytes(values.f2);
|
|
||||||
midResuleLog.setCommon_recv_time(values.f6);
|
|
||||||
return midResuleLog;
|
|
||||||
}
|
|
||||||
} catch (Exception e){
|
|
||||||
logger.error("加载中间结果集失败,keys: {} values: {}\n{}",keys,values,e);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Tuple7<Long, Long, Long,String,Long,Long,Long> sketchAggregate(Iterable<DosSketchLog> elements){
|
|
||||||
long sessions = 0;
|
|
||||||
long packets = 0 ;
|
|
||||||
long bytes = 0;
|
|
||||||
long startTime = System.currentTimeMillis()/1000;
|
|
||||||
long endTime = System.currentTimeMillis()/1000;
|
|
||||||
long duration = 0;
|
|
||||||
long recvtime = 0;
|
|
||||||
HashSet<String> sourceIpSet = new HashSet<>();
|
|
||||||
try {
|
|
||||||
for (DosSketchLog newSketchLog : elements){
|
|
||||||
if (recvtime == 0){
|
|
||||||
recvtime = newSketchLog.getCommon_recv_time();
|
|
||||||
}else if (recvtime > newSketchLog.getCommon_recv_time()){
|
|
||||||
recvtime = newSketchLog.getCommon_recv_time();
|
|
||||||
}
|
|
||||||
System.out.println(newSketchLog.getCommon_recv_time());
|
|
||||||
String sourceIp = newSketchLog.getSource_ip();
|
|
||||||
if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){
|
|
||||||
sessions += newSketchLog.getSketch_sessions();
|
|
||||||
packets += newSketchLog.getSketch_packets();
|
|
||||||
bytes += newSketchLog.getSketch_bytes();
|
|
||||||
startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
|
|
||||||
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
|
|
||||||
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
|
|
||||||
}else {
|
|
||||||
if (sourceIpSet.size() < FlowWriteConfig.SOURCE_IP_LIST_LIMIT){
|
|
||||||
sourceIpSet.add(sourceIp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
|
||||||
return Tuple7.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,
|
|
||||||
bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration,recvtime);
|
|
||||||
}catch (Exception e){
|
|
||||||
logger.error("聚合中间结果集失败 {}",e);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,84 +0,0 @@
|
|||||||
package com.zdjizhi.etl;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
import com.zdjizhi.source.DosSketchSource;
|
|
||||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
|
||||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
||||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
||||||
import org.apache.flink.util.Collector;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author wlh
|
|
||||||
*/
|
|
||||||
public class ParseSketchLog {
|
|
||||||
|
|
||||||
private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class);
|
|
||||||
|
|
||||||
|
|
||||||
public static SingleOutputStreamOperator<DosSketchLog> getSketchSource() {
|
|
||||||
return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SingleOutputStreamOperator<DosSketchLog> flatSketchSource() {
|
|
||||||
return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static WatermarkStrategy<DosSketchLog> createWatermarkStrategy() {
|
|
||||||
return WatermarkStrategy
|
|
||||||
.<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
|
|
||||||
.withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class FlatSketchLog implements FlatMapFunction<String, DosSketchLog> {
|
|
||||||
@Override
|
|
||||||
public void flatMap(String s, Collector<DosSketchLog> collector) {
|
|
||||||
try {
|
|
||||||
if (StringUtil.isNotBlank(s)) {
|
|
||||||
|
|
||||||
final long recv_time = System.currentTimeMillis()/1000;
|
|
||||||
|
|
||||||
HashMap<String, Object> sketchSource = JSONObject.parseObject(s, HashMap.class);
|
|
||||||
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
|
||||||
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
|
||||||
String attackType = sketchSource.get("attack_type").toString();
|
|
||||||
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
|
||||||
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
|
|
||||||
|
|
||||||
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
|
|
||||||
|
|
||||||
for (HashMap<String, Object> obj : reportIpList) {
|
|
||||||
DosSketchLog dosSketchLog = new DosSketchLog();
|
|
||||||
dosSketchLog.setCommon_recv_time(recv_time);
|
|
||||||
dosSketchLog.setSketch_start_time(sketchStartTime);
|
|
||||||
dosSketchLog.setSketch_duration(sketchDuration);
|
|
||||||
dosSketchLog.setAttack_type(attackType);
|
|
||||||
dosSketchLog.setVsys_id(vsysId);
|
|
||||||
String sourceIp = obj.get("source_ip").toString();
|
|
||||||
String destinationIp = obj.get("destination_ip").toString();
|
|
||||||
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
|
|
||||||
long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
|
|
||||||
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
|
|
||||||
dosSketchLog.setSource_ip(sourceIp);
|
|
||||||
dosSketchLog.setDestination_ip(destinationIp);
|
|
||||||
dosSketchLog.setSketch_sessions(sketchSessions);
|
|
||||||
dosSketchLog.setSketch_packets(sketchPackets);
|
|
||||||
dosSketchLog.setSketch_bytes(sketchBytes);
|
|
||||||
collector.collect(dosSketchLog);
|
|
||||||
logger.debug("数据解析成功:{}", dosSketchLog.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("数据解析错误:{} \n{}", s, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,36 +0,0 @@
|
|||||||
package com.zdjizhi.etl;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosMetricsLog;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
|
|
||||||
class TrafficServerIpMetrics {
|
|
||||||
|
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
|
|
||||||
DosMetricsLog dosMetricsLog = new DosMetricsLog();
|
|
||||||
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000));
|
|
||||||
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
|
|
||||||
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
|
|
||||||
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
|
|
||||||
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
|
|
||||||
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
|
|
||||||
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
|
|
||||||
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
|
|
||||||
logger.debug("metric 结果已加载:{}",dosMetricsLog.toString());
|
|
||||||
return dosMetricsLog;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long timeFloor(long sketchStartTime){
|
|
||||||
return sketchStartTime / FlowWriteConfig.FLINK_WINDOW_MAX_TIME * FlowWriteConfig.FLINK_WINDOW_MAX_TIME;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int getPartitionNumByIp(String destinationIp){
|
|
||||||
return Math.abs(destinationIp.hashCode()) % FlowWriteConfig.DESTINATION_IP_PARTITION_NUM;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,16 +1,18 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.function;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.geedgenetworks.utils.DateUtils;
|
import com.geedgenetworks.utils.DateUtils;
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
import com.geedgenetworks.utils.StringUtil;
|
||||||
import com.zdjizhi.common.*;
|
import com.zdjizhi.common.*;
|
||||||
import com.zdjizhi.utils.*;
|
import com.zdjizhi.utils.Snowflakeld.SnowflakeId;
|
||||||
import com.zdjizhi.utils.connections.nacos.NacosUtils;
|
import com.zdjizhi.utils.Threshold.ParseBaselineThreshold;
|
||||||
|
import com.zdjizhi.utils.Threshold.ParseStaticThreshold;
|
||||||
|
import com.zdjizhi.utils.connections.http.HttpClientService;
|
||||||
|
import com.zdjizhi.utils.knowledgebase.IpLookupUtils;
|
||||||
import inet.ipaddr.IPAddress;
|
import inet.ipaddr.IPAddress;
|
||||||
import inet.ipaddr.IPAddressString;
|
import inet.ipaddr.IPAddressString;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
@@ -19,48 +21,88 @@ import org.apache.flink.util.Collector;
|
|||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author wlh
|
* @author wlh
|
||||||
*/
|
*/
|
||||||
public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
public class DosDetectionFunction extends ProcessFunction<DosSketchLog, DosEventLog> {
|
||||||
|
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private static Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
private Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||||
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
private final NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
|
||||||
private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
|
private HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap;
|
||||||
|
private final int BASELINE_SIZE = 144;
|
||||||
private final static int BASELINE_SIZE = 144;
|
private final int STATIC_CONDITION_TYPE = 1;
|
||||||
private final static int STATIC_CONDITION_TYPE = 1;
|
private final int BASELINE_CONDITION_TYPE = 2;
|
||||||
private final static int BASELINE_CONDITION_TYPE = 2;
|
private final int SENSITIVITY_CONDITION_TYPE = 3;
|
||||||
private final static int SENSITIVITY_CONDITION_TYPE = 3;
|
private final String SESSIONS_TAG = "sessions";
|
||||||
|
private final String PACKETS_TAG = "packets";
|
||||||
private final static String SESSIONS_TAG = "sessions";
|
private final String BITS_TAG = "bits";
|
||||||
private final static String PACKETS_TAG = "packets";
|
private final int OTHER_BASELINE_TYPE = 3;
|
||||||
private final static String BITS_TAG = "bits";
|
private SnowflakeId snowflakeId;
|
||||||
|
private Configuration configuration;
|
||||||
private final static int OTHER_BASELINE_TYPE = 3;
|
private HttpClientService httpClientService;
|
||||||
|
private IpLookupUtils ipLookupUtils;
|
||||||
|
private ParseBaselineThreshold parseBaselineThresholdld;
|
||||||
|
private ParseStaticThreshold parseStaticThreshold;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) {
|
public void open(Configuration parameters) {
|
||||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
|
|
||||||
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
|
|
||||||
try {
|
|
||||||
super.open(parameters);
|
|
||||||
executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0,
|
|
||||||
FlowWriteConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES);
|
|
||||||
|
|
||||||
executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0,
|
configuration = (Configuration) getRuntimeContext()
|
||||||
FlowWriteConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
|
.getExecutionConfig().getGlobalJobParameters();
|
||||||
|
httpClientService = new HttpClientService(configuration);
|
||||||
|
|
||||||
|
snowflakeId = new SnowflakeId(configuration.get(DATA_CENTER_ID_NUM), getRuntimeContext().getIndexOfThisSubtask());
|
||||||
|
|
||||||
|
try {
|
||||||
|
ipLookupUtils = new IpLookupUtils(configuration, httpClientService);
|
||||||
|
ipLookupUtils.stuffKnowledgeMetaCache();
|
||||||
|
Timer timer = new Timer();
|
||||||
|
timer.schedule(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ipLookupUtils.stuffKnowledgeMetaCache();
|
||||||
|
logger.info("定位库定时调度成功");
|
||||||
|
}
|
||||||
|
}, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("定时器任务执行失败", e);
|
logger.error("定位库加载失败,具体原因为" + e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
parseStaticThreshold = new ParseStaticThreshold(configuration, httpClientService);
|
||||||
|
thresholdRangeMap = parseStaticThreshold.createStaticThreshold();
|
||||||
|
Timer timer = new Timer();
|
||||||
|
timer.schedule(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
thresholdRangeMap = parseStaticThreshold.createStaticThreshold();
|
||||||
|
logger.info("基于静态阈值构建threshold RangeMap成功,Threshold RangeMap:" + thresholdRangeMap.toString());
|
||||||
|
}
|
||||||
|
}, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("基于静态阈值构建threshold RangeMap失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
parseBaselineThresholdld = new ParseBaselineThreshold(configuration);
|
||||||
|
baselineMap = parseBaselineThresholdld.readFromHbase();
|
||||||
|
Timer timer = new Timer();
|
||||||
|
timer.schedule(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
baselineMap = parseBaselineThresholdld.readFromHbase();
|
||||||
|
logger.info("从Hbase获取baselineMap成功,baselineMap:" + thresholdRangeMap.toString());
|
||||||
|
}
|
||||||
|
}, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("从Hbase获取baselineMap失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
|
||||||
PERCENT_INSTANCE.setMinimumFractionDigits(2);
|
PERCENT_INSTANCE.setMinimumFractionDigits(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,7 +135,6 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("判定失败\n {} \n{}", value, e);
|
logger.error("判定失败\n {} \n{}", value, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (finalResult != null) {
|
if (finalResult != null) {
|
||||||
out.collect(finalResult);
|
out.collect(finalResult);
|
||||||
}
|
}
|
||||||
@@ -102,7 +143,7 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
|
|
||||||
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
|
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
|
||||||
long sketchSessions = value.getSketch_sessions();
|
long sketchSessions = value.getSketch_sessions();
|
||||||
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
|
Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD);
|
||||||
long diff = sketchSessions - staticSensitivityThreshold;
|
long diff = sketchSessions - staticSensitivityThreshold;
|
||||||
return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
|
return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
|
||||||
}
|
}
|
||||||
@@ -128,7 +169,7 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
double diffSessionPercent = 0.0;
|
double diffSessionPercent = 0.0;
|
||||||
double diffPktPercent = 0.0;
|
double diffPktPercent = 0.0;
|
||||||
double diffBitPercent = 0.0;
|
double diffBitPercent = 0.0;
|
||||||
//todo 代码Review发现该部分存在bug,23.11版本做修复,需测试。
|
|
||||||
if (sessionBase > 0) {
|
if (sessionBase > 0) {
|
||||||
diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
|
diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100;
|
||||||
}
|
}
|
||||||
@@ -143,13 +184,13 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
DosEventLog result = null;
|
DosEventLog result = null;
|
||||||
|
|
||||||
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
|
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent) {
|
||||||
profileId = threshold.getProfile_id();
|
profileId = threshold.getId();
|
||||||
result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
result = getDosEventLog(value, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||||
} else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
|
} else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent) {
|
||||||
profileId = threshold.getProfile_id();
|
profileId = threshold.getId();
|
||||||
result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
result = getDosEventLog(value, pktBase, diffPkt, profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||||
} else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
|
} else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent) {
|
||||||
profileId = threshold.getProfile_id();
|
profileId = threshold.getId();
|
||||||
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
result = getDosEventLog(value, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
@@ -162,14 +203,13 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
if (diff > 0 && base != 0) {
|
if (diff > 0 && base != 0) {
|
||||||
double percent = getDiffPercent(diff, base);
|
double percent = getDiffPercent(diff, base);
|
||||||
Severity severity = judgeSeverity(percent);
|
Severity severity = judgeSeverity(percent);
|
||||||
Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold");
|
Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD);
|
||||||
if (severity != Severity.NORMAL) {
|
if (severity != Severity.NORMAL) {
|
||||||
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
|
if (type == BASELINE_CONDITION_TYPE && percent < configuration.get(BASELINE_SENSITIVITY_THRESHOLD)) {
|
||||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||||
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) {
|
} else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) {
|
||||||
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
|
||||||
} else {
|
} else {
|
||||||
// result = getResult(value, base, profileId, severity, percent+1, type, tag);
|
|
||||||
result = getResult(value, base, profileId, severity, percent, type, tag);
|
result = getResult(value, base, profileId, severity, percent, type, tag);
|
||||||
if (type == SENSITIVITY_CONDITION_TYPE) {
|
if (type == SENSITIVITY_CONDITION_TYPE) {
|
||||||
result.setSeverity(Severity.MAJOR.severity);
|
result.setSeverity(Severity.MAJOR.severity);
|
||||||
@@ -186,7 +226,7 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
|
private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) {
|
||||||
DosEventLog dosEventLog = new DosEventLog();
|
DosEventLog dosEventLog = new DosEventLog();
|
||||||
dosEventLog.setRecv_time(value.getCommon_recv_time());
|
dosEventLog.setRecv_time(value.getCommon_recv_time());
|
||||||
dosEventLog.setLog_id(SnowflakeId.generateId());
|
dosEventLog.setLog_id(snowflakeId.nextId());
|
||||||
dosEventLog.setVsys_id(value.getVsys_id());
|
dosEventLog.setVsys_id(value.getVsys_id());
|
||||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||||
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
|
dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration());
|
||||||
@@ -194,9 +234,8 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
dosEventLog.setAttack_type(value.getAttack_type());
|
dosEventLog.setAttack_type(value.getAttack_type());
|
||||||
dosEventLog.setSeverity(severity.severity);
|
dosEventLog.setSeverity(severity.severity);
|
||||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog));
|
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog));
|
||||||
// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog));
|
|
||||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||||
dosEventLog.setDestination_country(IpLookupUtils.getCountryLookup(value.getDestination_ip()));
|
dosEventLog.setDestination_country(ipLookupUtils.getCountryLookup(value.getDestination_ip()));
|
||||||
String ipList = value.getSource_ip();
|
String ipList = value.getSource_ip();
|
||||||
dosEventLog.setSource_ip_list(ipList);
|
dosEventLog.setSource_ip_list(ipList);
|
||||||
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
|
dosEventLog.setSource_country_list(getSourceCountryList(ipList));
|
||||||
@@ -220,8 +259,8 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
|
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
|
||||||
base = defaultVaule;
|
base = defaultVaule;
|
||||||
}
|
}
|
||||||
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")) {
|
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < configuration.get(STATIC_SENSITIVITY_THRESHOLD)) {
|
||||||
base = NacosUtils.getIntProperty("static.sensitivity.threshold");
|
base = configuration.get(STATIC_SENSITIVITY_THRESHOLD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -268,12 +307,12 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
String[] ipArr = sourceIpList.split(",");
|
String[] ipArr = sourceIpList.split(",");
|
||||||
HashSet<String> countrySet = new HashSet<>();
|
HashSet<String> countrySet = new HashSet<>();
|
||||||
for (String ip : ipArr) {
|
for (String ip : ipArr) {
|
||||||
String country = IpLookupUtils.getCountryLookup(ip);
|
String country = ipLookupUtils.getCountryLookup(ip);
|
||||||
if (StringUtil.isNotBlank(country)) {
|
if (StringUtil.isNotBlank(country)) {
|
||||||
countrySet.add(country);
|
countrySet.add(country);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
countryList = StringUtils.join(countrySet, ", ");
|
countryList = StringUtils.join(countrySet, ",");
|
||||||
return countryList;
|
return countryList;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
|
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
|
||||||
@@ -297,7 +336,6 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private Double getDiffPercent(long diff, long base) {
|
private Double getDiffPercent(long diff, long base) {
|
||||||
try {
|
try {
|
||||||
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
|
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
|
||||||
@@ -309,15 +347,15 @@ public class DosDetection extends ProcessFunction<DosSketchLog, DosEventLog> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Severity judgeSeverity(double diffPercent) {
|
private Severity judgeSeverity(double diffPercent) {
|
||||||
if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold")) {
|
if (diffPercent >= configuration.get(BASELINE_SESSIONS_MINOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD)) {
|
||||||
return Severity.MINOR;
|
return Severity.MINOR;
|
||||||
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.major.threshold")) {
|
} else if (diffPercent >= configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD)) {
|
||||||
return Severity.WARNING;
|
return Severity.WARNING;
|
||||||
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.major.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold")) {
|
} else if (diffPercent >= configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD)) {
|
||||||
return Severity.MAJOR;
|
return Severity.MAJOR;
|
||||||
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
|
} else if (diffPercent >= configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) {
|
||||||
return Severity.SEVERE;
|
return Severity.SEVERE;
|
||||||
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
|
} else if (diffPercent >= configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) {
|
||||||
return Severity.CRITICAL;
|
return Severity.CRITICAL;
|
||||||
} else {
|
} else {
|
||||||
return Severity.NORMAL;
|
return Severity.NORMAL;
|
||||||
141
src/main/java/com/zdjizhi/function/EtlProcessFunction.java
Normal file
141
src/main/java/com/zdjizhi/function/EtlProcessFunction.java
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
package com.zdjizhi.function;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.zdjizhi.common.DosMetricsLog;
|
||||||
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple7;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||||
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.apache.flink.util.OutputTag;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author 94976
|
||||||
|
*/
|
||||||
|
public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosSketchLog, Tuple3<String, String, Integer>, TimeWindow> {
|
||||||
|
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
private final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0";
|
||||||
|
private final String EMPTY_SOURCE_IP_IPV6 = "::";
|
||||||
|
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics") {
|
||||||
|
};
|
||||||
|
private Configuration configuration;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
configuration = (Configuration) getRuntimeContext()
|
||||||
|
.getExecutionConfig().getGlobalJobParameters();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(Tuple3<String, String, Integer> keys,
|
||||||
|
Context context, Iterable<DosSketchLog> elements,
|
||||||
|
Collector<DosSketchLog> out) {
|
||||||
|
DosSketchLog middleResult = getMiddleResult(keys, elements);
|
||||||
|
try {
|
||||||
|
if (middleResult != null) {
|
||||||
|
out.collect(middleResult);
|
||||||
|
logger.debug("获取中间聚合结果:{}", middleResult.toString());
|
||||||
|
context.output(outputTag, getOutputMetric(middleResult));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) {
|
||||||
|
DosMetricsLog dosMetricsLog = new DosMetricsLog();
|
||||||
|
dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis() / 1000));
|
||||||
|
dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip());
|
||||||
|
dosMetricsLog.setAttack_type(midResuleLog.getAttack_type());
|
||||||
|
dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions());
|
||||||
|
dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets());
|
||||||
|
dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes());
|
||||||
|
dosMetricsLog.setVsys_id(midResuleLog.getVsys_id());
|
||||||
|
dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip()));
|
||||||
|
logger.debug("metric 结果已加载:{}", dosMetricsLog.toString());
|
||||||
|
return dosMetricsLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long timeFloor(long sketchStartTime) {
|
||||||
|
return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getPartitionNumByIp(String destinationIp) {
|
||||||
|
return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DosSketchLog getMiddleResult(Tuple3<String, String, Integer> keys, Iterable<DosSketchLog> elements) {
|
||||||
|
|
||||||
|
DosSketchLog midResuleLog = new DosSketchLog();
|
||||||
|
Tuple7<Long, Long, Long, String, Long, Long, Long> values = sketchAggregate(elements);
|
||||||
|
try {
|
||||||
|
if (values != null) {
|
||||||
|
midResuleLog.setAttack_type(keys.f0);
|
||||||
|
midResuleLog.setDestination_ip(keys.f1);
|
||||||
|
midResuleLog.setVsys_id(keys.f2);
|
||||||
|
midResuleLog.setSketch_start_time(values.f4);
|
||||||
|
midResuleLog.setSketch_duration(values.f5);
|
||||||
|
midResuleLog.setSource_ip(values.f3);
|
||||||
|
midResuleLog.setSketch_sessions(values.f0);
|
||||||
|
midResuleLog.setSketch_packets(values.f1);
|
||||||
|
midResuleLog.setSketch_bytes(values.f2);
|
||||||
|
midResuleLog.setCommon_recv_time(values.f6);
|
||||||
|
return midResuleLog;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Tuple7<Long, Long, Long, String, Long, Long, Long> sketchAggregate(Iterable<DosSketchLog> elements) {
|
||||||
|
long sessions = 0;
|
||||||
|
long packets = 0;
|
||||||
|
long bytes = 0;
|
||||||
|
long startTime = System.currentTimeMillis() / 1000;
|
||||||
|
long endTime = System.currentTimeMillis() / 1000;
|
||||||
|
long duration = 0;
|
||||||
|
long recvtime = 0;
|
||||||
|
HashSet<String> sourceIpSet = new HashSet<>();
|
||||||
|
try {
|
||||||
|
for (DosSketchLog newSketchLog : elements) {
|
||||||
|
if (recvtime == 0) {
|
||||||
|
recvtime = newSketchLog.getCommon_recv_time();
|
||||||
|
} else if (recvtime > newSketchLog.getCommon_recv_time()) {
|
||||||
|
recvtime = newSketchLog.getCommon_recv_time();
|
||||||
|
}
|
||||||
|
String sourceIp = newSketchLog.getSource_ip();
|
||||||
|
if (StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV6)) {
|
||||||
|
sessions += newSketchLog.getSketch_sessions();
|
||||||
|
packets += newSketchLog.getSketch_packets();
|
||||||
|
bytes += newSketchLog.getSketch_bytes();
|
||||||
|
startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time();
|
||||||
|
endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime;
|
||||||
|
duration = endTime - startTime == 0 ? 5 : endTime - startTime;
|
||||||
|
} else {
|
||||||
|
if (sourceIpSet.size() < configuration.get(SOURCE_IP_LIST_LIMIT)) {
|
||||||
|
sourceIpSet.add(sourceIp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
||||||
|
return Tuple7.of(sessions / configuration.get(FLINK_WINDOW_MAX_TIME), packets / configuration.get(FLINK_WINDOW_MAX_TIME),
|
||||||
|
bytes * 8 / configuration.get(FLINK_WINDOW_MAX_TIME), sourceIpList, startTime, duration, recvtime);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("聚合中间结果集失败 {}", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
53
src/main/java/com/zdjizhi/function/FlatSketchFunction.java
Normal file
53
src/main/java/com/zdjizhi/function/FlatSketchFunction.java
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package com.zdjizhi.function;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.geedgenetworks.utils.StringUtil;
|
||||||
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
|
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
public class FlatSketchFunction implements FlatMapFunction<String, DosSketchLog> {
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(FlatSketchFunction.class);
|
||||||
|
@Override
|
||||||
|
public void flatMap(String value, Collector<DosSketchLog> out) {
|
||||||
|
try {
|
||||||
|
if (StringUtil.isNotBlank(value)) {
|
||||||
|
final long recv_time = System.currentTimeMillis()/1000;
|
||||||
|
HashMap<String, Object> sketchSource = JSONObject.parseObject(value, HashMap.class);
|
||||||
|
long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString());
|
||||||
|
long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString());
|
||||||
|
String attackType = sketchSource.get("attack_type").toString();
|
||||||
|
int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString());
|
||||||
|
String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list"));
|
||||||
|
ArrayList<HashMap<String, Object>> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class);
|
||||||
|
for (HashMap<String, Object> obj : reportIpList) {
|
||||||
|
DosSketchLog dosSketchLog = new DosSketchLog();
|
||||||
|
dosSketchLog.setCommon_recv_time(recv_time);
|
||||||
|
dosSketchLog.setSketch_start_time(sketchStartTime);
|
||||||
|
dosSketchLog.setSketch_duration(sketchDuration);
|
||||||
|
dosSketchLog.setAttack_type(attackType);
|
||||||
|
dosSketchLog.setVsys_id(vsysId);
|
||||||
|
String sourceIp = obj.get("source_ip").toString();
|
||||||
|
String destinationIp = obj.get("destination_ip").toString();
|
||||||
|
long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString());
|
||||||
|
long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString());
|
||||||
|
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
|
||||||
|
dosSketchLog.setSource_ip(sourceIp);
|
||||||
|
dosSketchLog.setDestination_ip(destinationIp);
|
||||||
|
dosSketchLog.setSketch_sessions(sketchSessions);
|
||||||
|
dosSketchLog.setSketch_packets(sketchPackets);
|
||||||
|
dosSketchLog.setSketch_bytes(sketchBytes);
|
||||||
|
out.collect(dosSketchLog);
|
||||||
|
logger.debug("数据解析成功:{}", dosSketchLog);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("数据解析错误:{} \n{}", value, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
15
src/main/java/com/zdjizhi/function/SketchKeysSelector.java
Normal file
15
src/main/java/com/zdjizhi/function/SketchKeysSelector.java
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
package com.zdjizhi.function;
|
||||||
|
|
||||||
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
|
|
||||||
|
public class SketchKeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>> {
|
||||||
|
@Override
|
||||||
|
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
|
||||||
|
return Tuple3.of(
|
||||||
|
dosSketchLog.getAttack_type(),
|
||||||
|
dosSketchLog.getDestination_ip(),
|
||||||
|
dosSketchLog.getVsys_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,15 +1,84 @@
|
|||||||
package com.zdjizhi.main;
|
package com.zdjizhi.main;
|
||||||
|
|
||||||
import com.zdjizhi.sink.OutputStreamSink;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.zdjizhi.common.DosEventLog;
|
||||||
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
|
import com.zdjizhi.conf.DosConfiguration;
|
||||||
|
import com.zdjizhi.function.DosDetectionFunction;
|
||||||
|
import com.zdjizhi.function.EtlProcessFunction;
|
||||||
|
import com.zdjizhi.function.FlatSketchFunction;
|
||||||
|
import com.zdjizhi.function.SketchKeysSelector;
|
||||||
|
import com.zdjizhi.utils.connections.kafka.KafkaConsumer;
|
||||||
|
import com.zdjizhi.utils.connections.kafka.KafkaProducer;
|
||||||
|
import org.apache.flink.api.java.utils.ParameterTool;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||||
|
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||||
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author wlh
|
* @author wangchengcheng
|
||||||
* 程序主类入口
|
* 程序主类入口
|
||||||
*/
|
*/
|
||||||
public class DosDetectionApplication {
|
public class DosDetectionApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
OutputStreamSink.finalOutputSink();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
// param check
|
||||||
|
if (args.length < 1) {
|
||||||
|
throw new IllegalArgumentException("Error: Not found properties path. " +
|
||||||
|
"\nUsage: flink -c xxx xxx.jar app.properties.");
|
||||||
|
}
|
||||||
|
final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
|
||||||
|
|
||||||
|
final Configuration config = tool.getConfiguration();
|
||||||
|
env.getConfig().setGlobalJobParameters(config);
|
||||||
|
|
||||||
|
final DosConfiguration DosConfiguration = new DosConfiguration(config);
|
||||||
|
|
||||||
|
//Source settings
|
||||||
|
final DataStreamSource<String> dosStreamSource = env.addSource(KafkaConsumer.getKafkaConsumer(config.get(SOURCE_KAFKA_TOPIC), DosConfiguration
|
||||||
|
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX))).setParallelism(config.get(SOURCE_PARALLELISM));
|
||||||
|
|
||||||
|
//Watermark settings
|
||||||
|
final WatermarkStrategy<DosSketchLog> dosSketchLogWatermarkStrategy = WatermarkStrategy.
|
||||||
|
<DosSketchLog>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(FLINK_WATERMARK_MAX_ORDERNESS)))
|
||||||
|
.withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
|
||||||
|
|
||||||
|
//Data preprocessing
|
||||||
|
final SingleOutputStreamOperator<DosSketchLog> sketchSource = dosStreamSource.flatMap(new FlatSketchFunction())
|
||||||
|
.assignTimestampsAndWatermarks(dosSketchLogWatermarkStrategy);
|
||||||
|
|
||||||
|
//windowed aggregation
|
||||||
|
final SingleOutputStreamOperator<DosSketchLog> middleStream = sketchSource.keyBy(new SketchKeysSelector())
|
||||||
|
.window(TumblingEventTimeWindows.of(Time.seconds(config.get(FLINK_WINDOW_MAX_TIME)))).process(new EtlProcessFunction())
|
||||||
|
.setParallelism(config.get(Flink_FIRST_AGG_PATALLELISM));
|
||||||
|
|
||||||
|
//dos detection
|
||||||
|
final SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream = middleStream.process(new DosDetectionFunction())
|
||||||
|
.setParallelism(config.get(FLINK_DETECTION_MAP_PARALLELISM));
|
||||||
|
|
||||||
|
//dos event output
|
||||||
|
dosEventLogOutputStream.filter(Objects::nonNull)
|
||||||
|
.map(JSONObject::toJSONString)
|
||||||
|
.addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_EVENT_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)))
|
||||||
|
.setParallelism(config.get(KAFKA_SINK_EVENT_PARALLELISM));
|
||||||
|
|
||||||
|
//traffic server ip metrics output
|
||||||
|
middleStream.getSideOutput(EtlProcessFunction.outputTag).map(JSONObject::toJSONString)
|
||||||
|
.addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_METRIC_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)))
|
||||||
|
.setParallelism(config.get(KAFKA_SINK_METRIC_PARALLELISM));
|
||||||
|
|
||||||
|
env.execute(config.get(JOB_NAME));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
package com.zdjizhi.sink;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosEventLog;
|
|
||||||
//import com.zdjizhi.utils.JsonMapper;
|
|
||||||
import com.zdjizhi.utils.KafkaUtils;
|
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
class DosEventSink {
|
|
||||||
|
|
||||||
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
|
|
||||||
dosEventLogOutputStream
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
// .map(JsonMapper::toJsonString)
|
|
||||||
.map(JSONObject::toJSONString)
|
|
||||||
.addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
|
||||||
.setParallelism(FlowWriteConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,65 +0,0 @@
|
|||||||
package com.zdjizhi.sink;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosEventLog;
|
|
||||||
import com.zdjizhi.common.DosMetricsLog;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
import com.zdjizhi.etl.DosDetection;
|
|
||||||
import com.zdjizhi.etl.EtlProcessFunction;
|
|
||||||
import com.zdjizhi.etl.ParseSketchLog;
|
|
||||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
|
||||||
import org.apache.flink.api.java.functions.KeySelector;
|
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
|
||||||
import org.apache.flink.streaming.api.datastream.*;
|
|
||||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
|
||||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
|
||||||
import org.apache.flink.util.OutputTag;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author 94976
|
|
||||||
*/
|
|
||||||
public class OutputStreamSink {
|
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
public static OutputTag<DosMetricsLog> outputTag = new OutputTag<DosMetricsLog>("traffic server ip metrics"){};
|
|
||||||
|
|
||||||
public static void finalOutputSink(){
|
|
||||||
try {
|
|
||||||
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
|
|
||||||
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
|
|
||||||
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
|
|
||||||
FlinkEnvironmentUtils.streamExeEnv.execute(FlowWriteConfig.STREAM_EXECUTION_JOB_NAME);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("任务启动失败 {}",e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
|
||||||
return middleStream
|
|
||||||
.process(new DosDetection()).setParallelism(FlowWriteConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
|
|
||||||
return ParseSketchLog.getSketchSource()
|
|
||||||
.keyBy(new KeysSelector())
|
|
||||||
.window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.FLINK_WINDOW_MAX_TIME)))
|
|
||||||
.process(new EtlProcessFunction())
|
|
||||||
.setParallelism(FlowWriteConfig.FLINK_FIRST_AGG_PARALLELISM);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{
|
|
||||||
@Override
|
|
||||||
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
|
|
||||||
return Tuple3.of(
|
|
||||||
dosSketchLog.getAttack_type(),
|
|
||||||
dosSketchLog.getDestination_ip(),
|
|
||||||
dosSketchLog.getVsys_id());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
package com.zdjizhi.sink;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosMetricsLog;
|
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
|
||||||
|
|
||||||
import com.zdjizhi.utils.KafkaUtils;
|
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
||||||
|
|
||||||
import static com.zdjizhi.sink.OutputStreamSink.outputTag;
|
|
||||||
|
|
||||||
class TrafficServerIpMetricsSink {
|
|
||||||
|
|
||||||
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
|
|
||||||
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
|
|
||||||
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
|
||||||
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
|
||||||
.setParallelism(FlowWriteConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
package com.zdjizhi.source;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
|
||||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
||||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author wlh
|
|
||||||
*/
|
|
||||||
public class DosSketchSource {
|
|
||||||
|
|
||||||
private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv;
|
|
||||||
|
|
||||||
public static DataStreamSource<String> createDosSketchSource(){
|
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
|
|
||||||
properties.setProperty("group.id", FlowWriteConfig.KAFKA_GROUP_ID);
|
|
||||||
if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){
|
|
||||||
properties.put("security.protocol", "SASL_PLAINTEXT");
|
|
||||||
properties.put("sasl.mechanism", "PLAIN");
|
|
||||||
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
|
|
||||||
}
|
|
||||||
|
|
||||||
return streamExeEnv.addSource(new FlinkKafkaConsumer<String>(
|
|
||||||
FlowWriteConfig.KAFKA_INPUT_TOPIC_NAME,
|
|
||||||
new SimpleStringSchema(), properties))
|
|
||||||
.setParallelism(FlowWriteConfig.KAFKA_INPUT_PARALLELISM);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,45 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public final class CommonConfigurations {
|
|
||||||
|
|
||||||
private static Properties propService = new Properties();
|
|
||||||
|
|
||||||
|
|
||||||
public static String getStringProperty(String key) {
|
|
||||||
|
|
||||||
return propService.getProperty(key);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Integer getIntProperty(String key) {
|
|
||||||
|
|
||||||
return Integer.parseInt(propService.getProperty(key));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Double getDoubleProperty(String key) {
|
|
||||||
|
|
||||||
return Double.parseDouble(propService.getProperty(key));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Long getLongProperty(String key) {
|
|
||||||
return Long.parseLong(propService.getProperty(key));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Boolean getBooleanProperty(Integer type, String key) {
|
|
||||||
return "true".equals(propService.getProperty(key).toLowerCase().trim());
|
|
||||||
}
|
|
||||||
|
|
||||||
static {
|
|
||||||
try {
|
|
||||||
propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
|
|
||||||
} catch (Exception e) {
|
|
||||||
propService = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,200 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import org.apache.zookeeper.*;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.Condition;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class DistributedLock implements Lock, Watcher {
|
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
private ZooKeeper zk = null;
|
|
||||||
/**
|
|
||||||
* 根节点
|
|
||||||
*/
|
|
||||||
private final String ROOT_LOCK = "/locks";
|
|
||||||
/**
|
|
||||||
* 竞争的资源
|
|
||||||
*/
|
|
||||||
private String lockName;
|
|
||||||
/**
|
|
||||||
* 等待的前一个锁
|
|
||||||
*/
|
|
||||||
private String waitLock;
|
|
||||||
/**
|
|
||||||
* 当前锁
|
|
||||||
*/
|
|
||||||
private String currentLock;
|
|
||||||
/**
|
|
||||||
* 计数器
|
|
||||||
*/
|
|
||||||
private CountDownLatch countDownLatch;
|
|
||||||
|
|
||||||
private int sessionTimeout = 2000;
|
|
||||||
|
|
||||||
private List<Exception> exceptionList = new ArrayList<Exception>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 配置分布式锁
|
|
||||||
*
|
|
||||||
* @param config 连接的url
|
|
||||||
* @param lockName 竞争资源
|
|
||||||
*/
|
|
||||||
public DistributedLock(String config, String lockName) {
|
|
||||||
this.lockName = lockName;
|
|
||||||
try {
|
|
||||||
// 连接zookeeper
|
|
||||||
zk = new ZooKeeper(config, sessionTimeout, this);
|
|
||||||
Stat stat = zk.exists(ROOT_LOCK, false);
|
|
||||||
if (stat == null) {
|
|
||||||
// 如果根节点不存在,则创建根节点
|
|
||||||
zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
} catch (IOException | InterruptedException | KeeperException e) {
|
|
||||||
logger.error("Node already exists!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 节点监视器
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void process(WatchedEvent event) {
|
|
||||||
if (this.countDownLatch != null) {
|
|
||||||
this.countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void lock() {
|
|
||||||
if (exceptionList.size() > 0) {
|
|
||||||
throw new LockException(exceptionList.get(0));
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (this.tryLock()) {
|
|
||||||
logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
|
|
||||||
} else {
|
|
||||||
// 等待锁
|
|
||||||
waitForLock(waitLock, sessionTimeout);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
logger.error("获取锁异常" + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryLock() {
|
|
||||||
try {
|
|
||||||
String splitStr = "_lock_";
|
|
||||||
if (lockName.contains(splitStr)) {
|
|
||||||
throw new LockException("锁名有误");
|
|
||||||
}
|
|
||||||
// 创建临时有序节点
|
|
||||||
currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
|
|
||||||
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
||||||
// 取所有子节点
|
|
||||||
List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
|
|
||||||
// 取出所有lockName的锁
|
|
||||||
List<String> lockObjects = new ArrayList<String>();
|
|
||||||
for (String node : subNodes) {
|
|
||||||
String tmpNode = node.split(splitStr)[0];
|
|
||||||
if (tmpNode.equals(lockName)) {
|
|
||||||
lockObjects.add(node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Collections.sort(lockObjects);
|
|
||||||
// 若当前节点为最小节点,则获取锁成功
|
|
||||||
if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// 若不是最小节点,则找到自己的前一个节点
|
|
||||||
String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
|
|
||||||
waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
logger.error("获取锁过程异常" + e);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryLock(long timeout, TimeUnit unit) {
|
|
||||||
try {
|
|
||||||
if (this.tryLock()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return waitForLock(waitLock, timeout);
|
|
||||||
} catch (KeeperException | InterruptedException | RuntimeException e) {
|
|
||||||
logger.error("判断是否锁定异常" + e);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 等待锁
|
|
||||||
*
|
|
||||||
* @param prev 锁名称
|
|
||||||
* @param waitTime 等待时间
|
|
||||||
* @return
|
|
||||||
* @throws KeeperException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
|
|
||||||
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
|
|
||||||
|
|
||||||
if (stat != null) {
|
|
||||||
this.countDownLatch = new CountDownLatch(1);
|
|
||||||
// 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
|
|
||||||
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
|
|
||||||
this.countDownLatch = null;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unlock() {
|
|
||||||
try {
|
|
||||||
zk.delete(currentLock, -1);
|
|
||||||
currentLock = null;
|
|
||||||
zk.close();
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
logger.error("关闭锁异常" + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Condition newCondition() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void lockInterruptibly() throws InterruptedException {
|
|
||||||
this.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public class LockException extends RuntimeException {
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
public LockException(String e) {
|
|
||||||
super(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
public LockException(Exception e) {
|
|
||||||
super(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author wlh
|
|
||||||
*/
|
|
||||||
public class FlinkEnvironmentUtils {
|
|
||||||
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
||||||
|
|
||||||
static {
|
|
||||||
streamExeEnv.setParallelism(FlowWriteConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
|
|
||||||
|
|
||||||
/*
|
|
||||||
// 每 1000ms 开始一次 checkpoint
|
|
||||||
streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
|
|
||||||
|
|
||||||
// 设置模式为精确一次 (这是默认值)
|
|
||||||
streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
|
|
||||||
|
|
||||||
// 确认 checkpoints 之间的时间会进行 500 ms
|
|
||||||
streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
|
|
||||||
|
|
||||||
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
|
|
||||||
streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
|
|
||||||
|
|
||||||
// 允许两个连续的 checkpoint 错误
|
|
||||||
streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
|
|
||||||
|
|
||||||
// 同一时间只允许一个 checkpoint 进行
|
|
||||||
streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
|
||||||
|
|
||||||
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
|
|
||||||
streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
|
|
||||||
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
|
||||||
|
|
||||||
// 开启实验性的 unaligned checkpoints
|
|
||||||
streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,274 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import org.apache.http.*;
|
|
||||||
import org.apache.http.client.ClientProtocolException;
|
|
||||||
import org.apache.http.client.HttpRequestRetryHandler;
|
|
||||||
import org.apache.http.client.config.RequestConfig;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
||||||
import org.apache.http.client.methods.HttpGet;
|
|
||||||
import org.apache.http.client.methods.HttpPost;
|
|
||||||
import org.apache.http.client.protocol.HttpClientContext;
|
|
||||||
import org.apache.http.client.utils.URIBuilder;
|
|
||||||
import org.apache.http.conn.ConnectTimeoutException;
|
|
||||||
import org.apache.http.conn.ConnectionKeepAliveStrategy;
|
|
||||||
import org.apache.http.conn.HttpHostConnectException;
|
|
||||||
import org.apache.http.entity.ByteArrayEntity;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClients;
|
|
||||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
|
||||||
import org.apache.http.message.BasicHeaderElementIterator;
|
|
||||||
import org.apache.http.protocol.HTTP;
|
|
||||||
import org.apache.http.util.EntityUtils;
|
|
||||||
|
|
||||||
import javax.net.ssl.SSLException;
|
|
||||||
import javax.net.ssl.SSLHandshakeException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* http client工具类
|
|
||||||
* @author wlh
|
|
||||||
*/
|
|
||||||
public class HttpClientUtils {
|
|
||||||
/** 全局连接池对象 */
|
|
||||||
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
|
|
||||||
|
|
||||||
// private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
public static final String ERROR_MESSAGE = "-1";
|
|
||||||
|
|
||||||
/*
|
|
||||||
* 静态代码块配置连接池信息
|
|
||||||
*/
|
|
||||||
static {
|
|
||||||
|
|
||||||
// 设置最大连接数
|
|
||||||
CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION);
|
|
||||||
// 设置每个连接的路由数
|
|
||||||
CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取Http客户端连接对象
|
|
||||||
* @return Http客户端连接对象
|
|
||||||
*/
|
|
||||||
private static CloseableHttpClient getHttpClient() {
|
|
||||||
// 创建Http请求配置参数
|
|
||||||
RequestConfig requestConfig = RequestConfig.custom()
|
|
||||||
// 获取连接超时时间
|
|
||||||
.setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT)
|
|
||||||
// 请求超时时间
|
|
||||||
.setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT)
|
|
||||||
// 响应超时时间
|
|
||||||
.setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* 测出超时重试机制为了防止超时不生效而设置
|
|
||||||
* 如果直接放回false,不重试
|
|
||||||
* 这里会根据情况进行判断是否重试
|
|
||||||
*/
|
|
||||||
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
|
|
||||||
if (executionCount >= 3) {// 如果已经重试了3次,就放弃
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof UnknownHostException) {// 目标服务器不可达
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof HttpHostConnectException) {// 连接被拒绝
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof SSLException) {// ssl握手异常
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (exception instanceof InterruptedIOException) {// 超时
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
HttpClientContext clientContext = HttpClientContext.adapt(context);
|
|
||||||
HttpRequest request = clientContext.getRequest();
|
|
||||||
// 如果请求是幂等的,就再次尝试
|
|
||||||
return !(request instanceof HttpEntityEnclosingRequest);
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
|
|
||||||
HeaderElementIterator it = new BasicHeaderElementIterator
|
|
||||||
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
|
|
||||||
while (it.hasNext()) {
|
|
||||||
HeaderElement he = it.nextElement();
|
|
||||||
String param = he.getName();
|
|
||||||
String value = he.getValue();
|
|
||||||
if (value != null && "timeout".equalsIgnoreCase(param)) {
|
|
||||||
return Long.parseLong(value) * 1000;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 60 * 1000;//如果没有约定,则默认定义时长为60s
|
|
||||||
};
|
|
||||||
|
|
||||||
// 创建httpClient
|
|
||||||
return HttpClients.custom()
|
|
||||||
// 把请求相关的超时信息设置到连接客户端
|
|
||||||
.setDefaultRequestConfig(requestConfig)
|
|
||||||
// 把请求重试设置到连接客户端
|
|
||||||
.setRetryHandler(retry)
|
|
||||||
.setKeepAliveStrategy(myStrategy)
|
|
||||||
// 配置连接池管理对象
|
|
||||||
.setConnectionManager(CONN_MANAGER)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* GET请求
|
|
||||||
*
|
|
||||||
* @param uri 请求地
|
|
||||||
* @return message
|
|
||||||
*/
|
|
||||||
public static String httpGet(URI uri, Header... headers) {
|
|
||||||
String msg = ERROR_MESSAGE;
|
|
||||||
|
|
||||||
// 获取客户端连接对象
|
|
||||||
CloseableHttpClient httpClient = getHttpClient();
|
|
||||||
CloseableHttpResponse response = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
logger.info("http get uri {}",uri);
|
|
||||||
// 创建GET请求对象
|
|
||||||
HttpGet httpGet = new HttpGet(uri);
|
|
||||||
|
|
||||||
if (StringUtil.isNotEmpty(headers)) {
|
|
||||||
for (Header h : headers) {
|
|
||||||
httpGet.addHeader(h);
|
|
||||||
logger.info("request header : {}",h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 执行请求
|
|
||||||
response = httpClient.execute(httpGet);
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
// 获取响应实体
|
|
||||||
HttpEntity entity = response.getEntity();
|
|
||||||
// 获取响应信息
|
|
||||||
msg = EntityUtils.toString(entity, "UTF-8");
|
|
||||||
|
|
||||||
if (statusCode != HttpStatus.SC_OK) {
|
|
||||||
logger.error("Http get content is :{}" , msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (ClientProtocolException e) {
|
|
||||||
logger.error("协议错误: {}", e.getMessage());
|
|
||||||
} catch (ParseException e) {
|
|
||||||
logger.error("解析错误: {}", e.getMessage());
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("IO错误: {}",e.getMessage());
|
|
||||||
} finally {
|
|
||||||
if (null != response) {
|
|
||||||
try {
|
|
||||||
EntityUtils.consume(response.getEntity());
|
|
||||||
response.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("释放链接错误: {}", e.getMessage());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* POST 请求
|
|
||||||
* @param uri uri参数
|
|
||||||
* @param requestBody 请求体
|
|
||||||
* @return post请求返回结果
|
|
||||||
*/
|
|
||||||
public static String httpPost(URI uri, String requestBody, Header... headers) {
|
|
||||||
String msg = ERROR_MESSAGE;
|
|
||||||
// 获取客户端连接对象
|
|
||||||
CloseableHttpClient httpClient = getHttpClient();
|
|
||||||
|
|
||||||
// 创建POST请求对象
|
|
||||||
CloseableHttpResponse response = null;
|
|
||||||
try {
|
|
||||||
|
|
||||||
logger.info("http post uri:{}, http post body:{}", uri, requestBody);
|
|
||||||
|
|
||||||
HttpPost httpPost = new HttpPost(uri);
|
|
||||||
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
|
||||||
if (StringUtil.isNotEmpty(headers)) {
|
|
||||||
for (Header h : headers) {
|
|
||||||
httpPost.addHeader(h);
|
|
||||||
logger.info("request header : {}",h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(StringUtil.isNotBlank(requestBody)) {
|
|
||||||
byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
|
|
||||||
httpPost.setEntity(new ByteArrayEntity(bytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
response = httpClient.execute(httpPost);
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
// 获取响应实体
|
|
||||||
HttpEntity entity = response.getEntity();
|
|
||||||
// 获取响应信息
|
|
||||||
msg = EntityUtils.toString(entity, "UTF-8");
|
|
||||||
|
|
||||||
if (statusCode != HttpStatus.SC_OK) {
|
|
||||||
logger.error("Http post content is :{}" , msg);
|
|
||||||
}
|
|
||||||
} catch (ClientProtocolException e) {
|
|
||||||
logger.error("协议错误: {}", e.getMessage());
|
|
||||||
} catch (ParseException e) {
|
|
||||||
logger.error("解析错误: {}", e.getMessage());
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("IO错误: {}", e.getMessage());
|
|
||||||
} finally {
|
|
||||||
if (null != response) {
|
|
||||||
try {
|
|
||||||
EntityUtils.consumeQuietly(response.getEntity());
|
|
||||||
response.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("释放链接错误: {}", e.getMessage());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 拼装url
|
|
||||||
* url ,参数map
|
|
||||||
*/
|
|
||||||
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) {
|
|
||||||
try {
|
|
||||||
uriBuilder.setPath(path);
|
|
||||||
if (params != null && !params.isEmpty()){
|
|
||||||
for (Map.Entry<String, Object> kv : params.entrySet()) {
|
|
||||||
uriBuilder.setParameter(kv.getKey(),kv.getValue().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,160 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import cn.hutool.crypto.digest.DigestUtil;
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.alibaba.fastjson2.*;
|
|
||||||
import com.alibaba.nacos.api.config.ConfigService;
|
|
||||||
import com.alibaba.nacos.api.config.listener.Listener;
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
|
||||||
import com.geedgenetworks.utils.IpLookupV2;
|
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
|
|
||||||
import com.zdjizhi.utils.connections.http.HttpClientService;
|
|
||||||
import com.zdjizhi.utils.connections.nacos.NacosConnection;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author wangchengcheng
|
|
||||||
* @version 2023/11/10 15:23
|
|
||||||
*/
|
|
||||||
public class IpLookupUtils {
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
private static final String ipBuiltInName = "ip_builtin.mmdb";
|
|
||||||
private static final String ipUserDefinedName = "ip_user_defined.mmdb";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ip定位库
|
|
||||||
*/
|
|
||||||
private static IpLookupV2 ipLookup;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定位库默认分隔符
|
|
||||||
*/
|
|
||||||
private static final String LOCATION_SEPARATOR = ".";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 最大重试次数
|
|
||||||
*/
|
|
||||||
private static final int TRY_TIMES = 5;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* http connections
|
|
||||||
*/
|
|
||||||
private static final HttpClientService httpClientService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 定位库元数据缓存
|
|
||||||
*/
|
|
||||||
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
|
|
||||||
|
|
||||||
static {
|
|
||||||
JSONPath jsonPath = JSONPath.of(getFilterParameter());
|
|
||||||
httpClientService = new HttpClientService();
|
|
||||||
|
|
||||||
NacosConnection nacosConnection = new NacosConnection();
|
|
||||||
ConfigService schemaService = nacosConnection.getPublicService();
|
|
||||||
try {
|
|
||||||
String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() {
|
|
||||||
@Override
|
|
||||||
public Executor getExecutor() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void receiveConfigInfo(String configInfo) {
|
|
||||||
if (StringUtil.isNotBlank(configInfo)) {
|
|
||||||
updateIpLookup(jsonPath, configInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (StringUtil.isNotBlank(configInfo)) {
|
|
||||||
updateIpLookup(jsonPath, configInfo);
|
|
||||||
}
|
|
||||||
} catch (NacosException e) {
|
|
||||||
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void updateIpLookup(JSONPath jsonPath, String configInfo) {
|
|
||||||
String extract = jsonPath.extract(JSONReader.of(configInfo)).toString();
|
|
||||||
if (StringUtil.isNotBlank(extract)) {
|
|
||||||
JSONArray jsonArray = JSON.parseArray(extract);
|
|
||||||
if (jsonArray.size() > 0) {
|
|
||||||
for (int i = 0; i < jsonArray.size(); i++) {
|
|
||||||
KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
|
|
||||||
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat());
|
|
||||||
knowledgeMetaCache.put(fileName, knowlegeBaseMeta);
|
|
||||||
}
|
|
||||||
reloadIpLookup();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 从HDFS下载文件更新IpLookup
|
|
||||||
*/
|
|
||||||
private static void reloadIpLookup() {
|
|
||||||
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
|
|
||||||
for (String fileName : knowledgeMetaCache.keySet()) {
|
|
||||||
int retryNum = 0;
|
|
||||||
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
|
|
||||||
String metaSha256 = knowlegeBaseMeta.getSha256();
|
|
||||||
while (retryNum < TRY_TIMES) {
|
|
||||||
System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
|
|
||||||
Long startTime = System.currentTimeMillis();
|
|
||||||
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
|
|
||||||
if (httpGetByte != null && httpGetByte.length > 0) {
|
|
||||||
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
|
|
||||||
if (metaSha256.equals(downloadFileSha256)) {
|
|
||||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
|
|
||||||
switch (fileName) {
|
|
||||||
case ipBuiltInName:
|
|
||||||
builder.loadDataFile(inputStream);
|
|
||||||
break;
|
|
||||||
case ipUserDefinedName:
|
|
||||||
builder.loadDataFilePrivate(inputStream);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
|
|
||||||
retryNum = TRY_TIMES;
|
|
||||||
} else {
|
|
||||||
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
|
|
||||||
retryNum++;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
|
|
||||||
retryNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ipLookup = builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据配置组合生成知识库元数据过滤参数
|
|
||||||
*
|
|
||||||
* @return 过滤参数
|
|
||||||
*/
|
|
||||||
private static String getFilterParameter() {
|
|
||||||
// String expr = "$.[?(@.version=='latest' && @.name in ['ip_built_in','ip_user_defined'])].['name','sha256','format','path']";
|
|
||||||
|
|
||||||
|
|
||||||
String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]";
|
|
||||||
|
|
||||||
return expr;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getCountryLookup(String ip) {
|
|
||||||
return ipLookup.countryLookup(ip);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,35 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
|
||||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class KafkaUtils {
|
|
||||||
|
|
||||||
private static Properties getKafkaSinkProperty(){
|
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
|
|
||||||
if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){
|
|
||||||
properties.put("security.protocol", "SASL_PLAINTEXT");
|
|
||||||
properties.put("sasl.mechanism", "PLAIN");
|
|
||||||
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
|
|
||||||
}
|
|
||||||
|
|
||||||
return properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
|
|
||||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
|
|
||||||
topic,
|
|
||||||
new SimpleStringSchema(),
|
|
||||||
getKafkaSinkProperty(),
|
|
||||||
Optional.empty()
|
|
||||||
);
|
|
||||||
kafkaProducer.setLogFailuresOnly(true);
|
|
||||||
return kafkaProducer;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,13 +1,18 @@
|
|||||||
package com.zdjizhi.utils;
|
package com.zdjizhi.utils.Snowflakeld;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.DATA_CENTER_ID_NUM;
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.HBASE_ZOOKEEPER_QUORUM;
|
||||||
|
|
||||||
public class SnowflakeId {
|
public class SnowflakeId {
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
|
private Configuration configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 共64位 第一位为符号位 默认0
|
* 共64位 第一位为符号位 默认0
|
||||||
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :7位(0-127),
|
* 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :7位(0-127),
|
||||||
@@ -87,46 +92,32 @@ public class SnowflakeId {
|
|||||||
*/
|
*/
|
||||||
private long lastTimestamp = -1L;
|
private long lastTimestamp = -1L;
|
||||||
|
|
||||||
|
private static final long rollBackTime = 10000L;
|
||||||
/**
|
|
||||||
* 设置允许时间回拨的最大限制10s
|
|
||||||
*/
|
|
||||||
private static final long ROLL_BACK_TIME = 10000L;
|
|
||||||
|
|
||||||
|
|
||||||
private static SnowflakeId idWorker;
|
|
||||||
|
|
||||||
private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
|
|
||||||
|
|
||||||
static {
|
|
||||||
idWorker = new SnowflakeId(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, FlowWriteConfig.DATA_CENTER_ID_NUM);
|
|
||||||
}
|
|
||||||
|
|
||||||
//==============================Constructors=====================================
|
//==============================Constructors=====================================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 构造函数
|
* 初始化雪花ID
|
||||||
|
*
|
||||||
|
* @param dataCenterIdNum 数据中心编号
|
||||||
|
* @param tmpWorkerId worker编号
|
||||||
*/
|
*/
|
||||||
private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
|
public SnowflakeId(long dataCenterIdNum, long tmpWorkerId) {
|
||||||
DistributedLock lock = new DistributedLock(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1");
|
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
||||||
try {
|
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
||||||
lock.lock();
|
|
||||||
int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
|
|
||||||
if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
|
|
||||||
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
|
|
||||||
}
|
|
||||||
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
|
|
||||||
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
|
|
||||||
}
|
|
||||||
this.workerId = tmpWorkerId;
|
|
||||||
this.dataCenterId = dataCenterIdNum;
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
logger.error("This is not usual error!!!===>>>" + e + "<<<===");
|
|
||||||
}finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
}
|
||||||
}
|
if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
|
||||||
|
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
|
||||||
|
}
|
||||||
|
this.workerId = tmpWorkerId;
|
||||||
|
this.dataCenterId = dataCenterIdNum;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
// ==============================Methods==========================================
|
// ==============================Methods==========================================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -134,10 +125,10 @@ public class SnowflakeId {
|
|||||||
*
|
*
|
||||||
* @return SnowflakeId
|
* @return SnowflakeId
|
||||||
*/
|
*/
|
||||||
private synchronized long nextId() {
|
public synchronized long nextId() {
|
||||||
long timestamp = timeGen();
|
long timestamp = timeGen();
|
||||||
//设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
|
//设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
|
||||||
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < ROLL_BACK_TIME) {
|
if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
|
||||||
timestamp = tilNextMillis(lastTimestamp);
|
timestamp = tilNextMillis(lastTimestamp);
|
||||||
}
|
}
|
||||||
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|
||||||
@@ -194,12 +185,4 @@ public class SnowflakeId {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 静态工具类
|
|
||||||
*/
|
|
||||||
public static Long generateId() {
|
|
||||||
return idWorker.nextId();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -1,61 +1,64 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.utils.Threshold;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.geedgenetworks.utils.DateUtils;
|
import com.geedgenetworks.utils.DateUtils;
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosBaselineThreshold;
|
import com.zdjizhi.common.DosBaselineThreshold;
|
||||||
import com.zdjizhi.utils.HbaseUtils;
|
import com.zdjizhi.utils.connections.hbase.HbaseUtils;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.*;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
public class ParseBaselineThreshold {
|
public class ParseBaselineThreshold {
|
||||||
|
|
||||||
|
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private static ArrayList<String> floodTypeList = new ArrayList<>();
|
private ArrayList<String> floodTypeList = new ArrayList<>();
|
||||||
|
private Configuration configuration;
|
||||||
|
private Table table = null;
|
||||||
|
private Scan scan = null;
|
||||||
|
|
||||||
private static Table table = null;
|
public ParseBaselineThreshold(Configuration configuration) {
|
||||||
private static Scan scan = null;
|
this.configuration = configuration;
|
||||||
|
this.floodTypeList.add("TCP SYN Flood");
|
||||||
static {
|
this.floodTypeList.add("UDP Flood");
|
||||||
floodTypeList.add("TCP SYN Flood");
|
this.floodTypeList.add("ICMP Flood");
|
||||||
floodTypeList.add("UDP Flood");
|
this.floodTypeList.add("DNS Flood");
|
||||||
floodTypeList.add("ICMP Flood");
|
|
||||||
floodTypeList.add("DNS Flood");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void prepareHbaseEnv() throws IOException {
|
|
||||||
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
|
||||||
|
|
||||||
config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM);
|
private void prepareHbaseEnv() throws IOException {
|
||||||
|
final org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
||||||
|
config.set("hbase.zookeeper.quorum", configuration.get(HBASE_ZOOKEEPER_QUORUM));
|
||||||
config.set("hbase.client.retries.number", "3");
|
config.set("hbase.client.retries.number", "3");
|
||||||
config.set("hbase.bulkload.retries.number", "3");
|
config.set("hbase.bulkload.retries.number", "3");
|
||||||
config.set("zookeeper.recovery.retry", "3");
|
config.set("zookeeper.recovery.retry", "3");
|
||||||
config.set("hbase.defaults.for.version", "2.2.3");
|
config.set("hbase.defaults.for.version", "2.2.3");
|
||||||
config.set("hbase.defaults.for.version.skip", "true");
|
config.set("hbase.defaults.for.version.skip", "true");
|
||||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, configuration.get(HBASE_CLIENT_OPERATION_TIMEOUT));
|
||||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, configuration.get(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
|
||||||
|
|
||||||
TableName tableName = TableName.valueOf(FlowWriteConfig.HBASE_BASELINE_TABLE_NAME);
|
TableName tableName = TableName.valueOf(configuration.get(HBASE_BASELINE_TABLE_NAME));
|
||||||
Connection conn = ConnectionFactory.createConnection(config);
|
Connection conn = ConnectionFactory.createConnection(config);
|
||||||
table = conn.getTable(tableName);
|
table = conn.getTable(tableName);
|
||||||
long currentTimeMillis = System.currentTimeMillis();
|
long currentTimeMillis = System.currentTimeMillis();
|
||||||
scan = new Scan()
|
scan = new Scan()
|
||||||
.setAllowPartialResults(true)
|
.setAllowPartialResults(true)
|
||||||
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(FlowWriteConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
|
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(configuration.get(HBASE_BASELINE_TTL))).getTime(), currentTimeMillis)
|
||||||
.setLimit(FlowWriteConfig.HBASE_BASELINE_TOTAL_NUM);
|
.setLimit(configuration.get(HBASE_BASELINE_TOTAL_NUM));
|
||||||
logger.info("连接hbase成功,正在读取baseline数据");
|
logger.info("连接hbase成功,正在读取baseline数据");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Map<String, Map<String, DosBaselineThreshold>> readFromHbase() {
|
public Map<String, Map<String, DosBaselineThreshold>> readFromHbase() {
|
||||||
Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
Map<String, Map<String, DosBaselineThreshold>> baselineMap = new HashMap<>();
|
||||||
try {
|
try {
|
||||||
prepareHbaseEnv();
|
prepareHbaseEnv();
|
||||||
@@ -64,16 +67,16 @@ public class ParseBaselineThreshold {
|
|||||||
for (Result result : rs) {
|
for (Result result : rs) {
|
||||||
Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>();
|
Map<String, DosBaselineThreshold> floodTypeMap = new HashMap<>();
|
||||||
String rowkey = Bytes.toString(result.getRow());
|
String rowkey = Bytes.toString(result.getRow());
|
||||||
for (String type:floodTypeList){
|
for (String type : floodTypeList) {
|
||||||
DosBaselineThreshold baselineThreshold = new DosBaselineThreshold();
|
DosBaselineThreshold baselineThreshold = new DosBaselineThreshold();
|
||||||
ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate");
|
ArrayList<Integer> sessionRate = HbaseUtils.getArraylist(result, type, "session_rate");
|
||||||
if (sessionRate != null && !sessionRate.isEmpty()){
|
if (sessionRate != null && !sessionRate.isEmpty()) {
|
||||||
Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value");
|
Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value");
|
||||||
Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type");
|
Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type");
|
||||||
baselineThreshold.setSession_rate(sessionRate);
|
baselineThreshold.setSession_rate(sessionRate);
|
||||||
baselineThreshold.setSession_rate_default_value(defaultValue);
|
baselineThreshold.setSession_rate_default_value(defaultValue);
|
||||||
baselineThreshold.setSession_rate_baseline_type(rateBaselineType);
|
baselineThreshold.setSession_rate_baseline_type(rateBaselineType);
|
||||||
floodTypeMap.put(type,baselineThreshold);
|
floodTypeMap.put(type, baselineThreshold);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
baselineMap.put(rowkey, floodTypeMap);
|
baselineMap.put(rowkey, floodTypeMap);
|
||||||
@@ -1,22 +1,17 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.utils.Threshold;
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
//import com.fasterxml.jackson.databind.JavaType;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.common.DosDetectionThreshold;
|
import com.zdjizhi.common.DosDetectionThreshold;
|
||||||
import com.zdjizhi.common.DosVsysId;
|
import com.zdjizhi.common.DosVsysId;
|
||||||
import com.zdjizhi.utils.HttpClientUtils;
|
import com.zdjizhi.utils.connections.http.HttpClientService;
|
||||||
//import com.zdjizhi.utils.JsonMapper;
|
|
||||||
|
|
||||||
import com.zdjizhi.utils.connections.nacos.NacosUtils;
|
|
||||||
import inet.ipaddr.IPAddress;
|
import inet.ipaddr.IPAddress;
|
||||||
import inet.ipaddr.IPAddressString;
|
import inet.ipaddr.IPAddressString;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
|
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
|
||||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||||
|
|
||||||
import org.apache.http.client.utils.URIBuilder;
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.apache.http.message.BasicHeader;
|
import org.apache.http.message.BasicHeader;
|
||||||
|
|
||||||
@@ -31,30 +26,35 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author wlh
|
* @author wlh
|
||||||
*/
|
*/
|
||||||
public class ParseStaticThreshold {
|
public class ParseStaticThreshold {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private static String encryptpwd;
|
public Configuration configuration;
|
||||||
|
private String encryptpwd;
|
||||||
|
private HttpClientService httpClientService;
|
||||||
|
|
||||||
static {
|
|
||||||
//加载加密登录密码
|
public ParseStaticThreshold(Configuration configuration, HttpClientService httpClientService) {
|
||||||
encryptpwd = getEncryptpwd();
|
this.configuration = configuration;
|
||||||
|
this.httpClientService = httpClientService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取加密密码
|
* 获取加密密码
|
||||||
*/
|
*/
|
||||||
private static String getEncryptpwd() {
|
private String getEncryptpwd() {
|
||||||
String psw = HttpClientUtils.ERROR_MESSAGE;
|
String psw = httpClientService.ERROR_MESSAGE;
|
||||||
try {
|
try {
|
||||||
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
|
URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
|
||||||
HashMap<String, Object> parms = new HashMap<>();
|
HashMap<String, Object> parms = new HashMap<>();
|
||||||
parms.put("password", "admin");
|
parms.put("password", "admin");
|
||||||
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms);
|
httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_ENCRYPTPWD_PATH), parms);
|
||||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
|
String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT));
|
||||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
||||||
boolean success = (boolean) resposeMap.get("success");
|
boolean success = (boolean) resposeMap.get("success");
|
||||||
String msg = resposeMap.get("msg").toString();
|
String msg = resposeMap.get("msg").toString();
|
||||||
@@ -80,21 +80,21 @@ public class ParseStaticThreshold {
|
|||||||
*
|
*
|
||||||
* @return vsysIdList
|
* @return vsysIdList
|
||||||
*/
|
*/
|
||||||
private static ArrayList<DosVsysId> getVsysId() {
|
private ArrayList<DosVsysId> getVsysId() {
|
||||||
ArrayList<DosVsysId> vsysIdList = null;
|
ArrayList<DosVsysId> vsysIdList = null;
|
||||||
try {
|
try {
|
||||||
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
|
URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
|
||||||
HashMap<String, Object> parms = new HashMap<>();
|
HashMap<String, Object> parms = new HashMap<>();
|
||||||
parms.put("page_size", -1);
|
parms.put("page_size", -1);
|
||||||
// parms.put("orderBy", "vsysId desc");
|
// parms.put("orderBy", "vsysId desc");
|
||||||
parms.put("type", 1);
|
parms.put("type", 1);
|
||||||
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
|
httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_VSYSID_PATH), parms);
|
||||||
String token = NacosUtils.getStringProperty("bifang.server.token");
|
String token = configuration.get(BIFANG_SERVER_TOKEN);
|
||||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
if (!httpClientService.ERROR_MESSAGE.equals(token)) {
|
||||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1);
|
||||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class);
|
||||||
boolean success = (boolean) resposeMap.get("success");
|
boolean success = (boolean) resposeMap.get("success");
|
||||||
String msg = resposeMap.get("msg").toString();
|
String msg = resposeMap.get("msg").toString();
|
||||||
@@ -123,7 +123,7 @@ public class ParseStaticThreshold {
|
|||||||
* 根据vsysId获取静态阈值配置列表
|
* 根据vsysId获取静态阈值配置列表
|
||||||
* @return thresholds
|
* @return thresholds
|
||||||
*/
|
*/
|
||||||
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
|
private ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
|
||||||
ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>();
|
ArrayList<DosDetectionThreshold> vsysThresholds = new ArrayList<>();
|
||||||
ArrayList<DosVsysId> vsysIds = getVsysId();
|
ArrayList<DosVsysId> vsysIds = getVsysId();
|
||||||
try {
|
try {
|
||||||
@@ -131,19 +131,19 @@ public class ParseStaticThreshold {
|
|||||||
for (DosVsysId dosVsysId : vsysIds) {
|
for (DosVsysId dosVsysId : vsysIds) {
|
||||||
Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId();
|
Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId();
|
||||||
Integer[] superiorIds = dosVsysId.getSuperior_ids();
|
Integer[] superiorIds = dosVsysId.getSuperior_ids();
|
||||||
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI);
|
URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI));
|
||||||
HashMap<String, Object> parms = new HashMap<>();
|
HashMap<String, Object> parms = new HashMap<>();
|
||||||
parms.put("page_size", -1);
|
parms.put("page_size", -1);
|
||||||
// parms.put("order_by", "profileId asc");
|
// parms.put("order_by", "profileId asc");
|
||||||
parms.put("is_valid", 1);
|
parms.put("is_valid", 1);
|
||||||
parms.put("vsys_id", vsysId);
|
parms.put("vsys_id", vsysId);
|
||||||
HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
|
httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_THRESHOLD_PATH), parms);
|
||||||
String token = NacosUtils.getStringProperty("bifang.server.token");
|
String token = configuration.get(BIFANG_SERVER_TOKEN);
|
||||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
if (!httpClientService.ERROR_MESSAGE.equals(token)) {
|
||||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1);
|
||||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||||
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
|
HashMap<String, Object> resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class);
|
||||||
boolean success = (boolean) resposeMap.get("success");
|
boolean success = (boolean) resposeMap.get("success");
|
||||||
String msg = resposeMap.get("msg").toString();
|
String msg = resposeMap.get("msg").toString();
|
||||||
@@ -179,7 +179,7 @@ public class ParseStaticThreshold {
|
|||||||
*
|
*
|
||||||
* @return threshold RangeMap
|
* @return threshold RangeMap
|
||||||
*/
|
*/
|
||||||
static HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() {
|
public HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> createStaticThreshold() {
|
||||||
HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4);
|
HashMap<Integer, HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>>> thresholdRangeMap = new HashMap<>(4);
|
||||||
try {
|
try {
|
||||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||||
@@ -189,10 +189,7 @@ public class ParseStaticThreshold {
|
|||||||
String attackType = threshold.getAttack_type();
|
String attackType = threshold.getAttack_type();
|
||||||
int vsysId = threshold.getVsys_id();
|
int vsysId = threshold.getVsys_id();
|
||||||
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
|
HashMap<String, TreeRangeMap<IPAddress, DosDetectionThreshold>> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>());
|
||||||
|
|
||||||
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
|
TreeRangeMap<IPAddress, DosDetectionThreshold> treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create());
|
||||||
|
|
||||||
|
|
||||||
ArrayList<String> serverIpList = threshold.getServer_ip_list();
|
ArrayList<String> serverIpList = threshold.getServer_ip_list();
|
||||||
|
|
||||||
for (String sip : serverIpList) {
|
for (String sip : serverIpList) {
|
||||||
@@ -243,11 +240,11 @@ public class ParseStaticThreshold {
|
|||||||
*
|
*
|
||||||
* @return token
|
* @return token
|
||||||
*/
|
*/
|
||||||
private static String loginBifangServer() {
|
private String loginBifangServer() {
|
||||||
String token = HttpClientUtils.ERROR_MESSAGE;
|
String token = httpClientService.ERROR_MESSAGE;
|
||||||
try {
|
try {
|
||||||
final HashMap<String, Object> parmsMap = new HashMap<>();
|
final HashMap<String, Object> parmsMap = new HashMap<>();
|
||||||
String urlString = FlowWriteConfig.BIFANG_SERVER_URI+FlowWriteConfig.BIFANG_SERVER_LOGIN_PATH;
|
String urlString = configuration.get(BIFANG_SERVER_URI)+configuration.get(BIFANG_SERVER_LOGIN_PATH);
|
||||||
parmsMap.put("username","admin");
|
parmsMap.put("username","admin");
|
||||||
parmsMap.put("password",encryptpwd);
|
parmsMap.put("password",encryptpwd);
|
||||||
parmsMap.put("auth_mode","");
|
parmsMap.put("auth_mode","");
|
||||||
@@ -1,136 +0,0 @@
|
|||||||
package com.zdjizhi.utils;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import org.apache.zookeeper.*;
|
|
||||||
import org.apache.zookeeper.data.ACL;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
|
|
||||||
public class ZookeeperUtils implements Watcher {
|
|
||||||
// private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class);
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
private ZooKeeper zookeeper;
|
|
||||||
|
|
||||||
private static final int SESSION_TIME_OUT = 20000;
|
|
||||||
|
|
||||||
private CountDownLatch countDownLatch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(WatchedEvent event) {
|
|
||||||
if (event.getState() == Event.KeeperState.SyncConnected) {
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 修改节点信息
|
|
||||||
*
|
|
||||||
* @param path 节点路径
|
|
||||||
*/
|
|
||||||
int modifyNode(String path, String zookeeperIp) {
|
|
||||||
createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
|
|
||||||
int workerId = 0;
|
|
||||||
try {
|
|
||||||
connectZookeeper(zookeeperIp);
|
|
||||||
Stat stat = zookeeper.exists(path, true);
|
|
||||||
workerId = Integer.parseInt(getNodeDate(path));
|
|
||||||
if (workerId > 63) {
|
|
||||||
workerId = 0;
|
|
||||||
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
|
|
||||||
} else {
|
|
||||||
String result = String.valueOf(workerId + 1);
|
|
||||||
if (stat != null) {
|
|
||||||
zookeeper.setData(path, result.getBytes(), stat.getVersion());
|
|
||||||
} else {
|
|
||||||
logger.error("Node does not exist!,Can't modify");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException | InterruptedException e) {
|
|
||||||
logger.error("modify error Can't modify," + e);
|
|
||||||
} finally {
|
|
||||||
closeConn();
|
|
||||||
}
|
|
||||||
logger.warn("workerID is:" + workerId);
|
|
||||||
return workerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 连接zookeeper
|
|
||||||
*
|
|
||||||
* @param host 地址
|
|
||||||
*/
|
|
||||||
private void connectZookeeper(String host) {
|
|
||||||
try {
|
|
||||||
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
|
|
||||||
countDownLatch.await();
|
|
||||||
} catch (IOException | InterruptedException e) {
|
|
||||||
logger.error("Connection to the Zookeeper Exception! message:" + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 关闭连接
|
|
||||||
*/
|
|
||||||
private void closeConn() {
|
|
||||||
try {
|
|
||||||
if (zookeeper != null) {
|
|
||||||
zookeeper.close();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.error("Close the Zookeeper connection Exception! message:" + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取节点内容
|
|
||||||
*
|
|
||||||
* @param path 节点路径
|
|
||||||
* @return 内容/异常null
|
|
||||||
*/
|
|
||||||
private String getNodeDate(String path) {
|
|
||||||
String result = null;
|
|
||||||
Stat stat = new Stat();
|
|
||||||
try {
|
|
||||||
byte[] resByte = zookeeper.getData(path, true, stat);
|
|
||||||
|
|
||||||
result = StrUtil.str(resByte, "UTF-8");
|
|
||||||
} catch (KeeperException | InterruptedException e) {
|
|
||||||
logger.error("Get node information exception" + e);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param path 节点创建的路径
|
|
||||||
* @param date 节点所存储的数据的byte[]
|
|
||||||
* @param acls 控制权限策略
|
|
||||||
*/
|
|
||||||
private void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
|
|
||||||
try {
|
|
||||||
connectZookeeper(zookeeperIp);
|
|
||||||
Stat exists = zookeeper.exists(path, true);
|
|
||||||
if (exists == null) {
|
|
||||||
Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
|
|
||||||
if (existsSnowflakeld == null) {
|
|
||||||
zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
|
|
||||||
} else {
|
|
||||||
logger.warn("Node already exists ! Don't need to create");
|
|
||||||
}
|
|
||||||
} catch (KeeperException | InterruptedException e) {
|
|
||||||
logger.error(e.toString());
|
|
||||||
} finally {
|
|
||||||
closeConn();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.utils;
|
package com.zdjizhi.utils.connections.hbase;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.*;
|
import org.apache.hadoop.hbase.client.*;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
@@ -4,9 +4,9 @@ import cn.hutool.log.Log;
|
|||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
|
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
import com.geedgenetworks.utils.StringUtil;
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.utils.exception.FlowWriteException;
|
import com.zdjizhi.utils.exception.FlowWriteException;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.http.*;
|
import org.apache.http.*;
|
||||||
import org.apache.http.client.ClientProtocolException;
|
import org.apache.http.client.ClientProtocolException;
|
||||||
import org.apache.http.client.HttpRequestRetryHandler;
|
import org.apache.http.client.HttpRequestRetryHandler;
|
||||||
@@ -14,6 +14,7 @@ import org.apache.http.client.config.RequestConfig;
|
|||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.apache.http.client.protocol.HttpClientContext;
|
import org.apache.http.client.protocol.HttpClientContext;
|
||||||
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.apache.http.config.Registry;
|
import org.apache.http.config.Registry;
|
||||||
import org.apache.http.config.RegistryBuilder;
|
import org.apache.http.config.RegistryBuilder;
|
||||||
import org.apache.http.conn.ConnectTimeoutException;
|
import org.apache.http.conn.ConnectTimeoutException;
|
||||||
@@ -34,14 +35,23 @@ import java.io.IOException;
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.KeyManagementException;
|
import java.security.KeyManagementException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.cert.X509Certificate;
|
import java.security.cert.X509Certificate;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
public class HttpClientService {
|
public class HttpClientService {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
public static final String ERROR_MESSAGE = "-1";
|
||||||
|
private Configuration configuration;
|
||||||
|
|
||||||
private static final Log log = LogFactory.get();
|
public HttpClientService(Configuration configuration) {
|
||||||
|
this.configuration = configuration;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 在调用SSL之前需要重写验证方法,取消检测SSL
|
* 在调用SSL之前需要重写验证方法,取消检测SSL
|
||||||
@@ -75,9 +85,9 @@ public class HttpClientService {
|
|||||||
// 创建ConnectionManager,添加Connection配置信息
|
// 创建ConnectionManager,添加Connection配置信息
|
||||||
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
|
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
|
||||||
// 设置最大连接数
|
// 设置最大连接数
|
||||||
connManager.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION);
|
connManager.setMaxTotal(configuration.get(HTTP_POOL_MAX_CONNECTION));
|
||||||
// 设置每个连接的路由数
|
// 设置每个连接的路由数
|
||||||
connManager.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE);
|
connManager.setDefaultMaxPerRoute(configuration.get(HTTP_POOL_MAX_PER_ROUTE));
|
||||||
return connManager;
|
return connManager;
|
||||||
} catch (KeyManagementException | NoSuchAlgorithmException e) {
|
} catch (KeyManagementException | NoSuchAlgorithmException e) {
|
||||||
throw new FlowWriteException(e.getMessage());
|
throw new FlowWriteException(e.getMessage());
|
||||||
@@ -94,9 +104,9 @@ public class HttpClientService {
|
|||||||
// 创建Http请求配置参数
|
// 创建Http请求配置参数
|
||||||
RequestConfig requestConfig = RequestConfig.custom()
|
RequestConfig requestConfig = RequestConfig.custom()
|
||||||
// 获取连接超时时间
|
// 获取连接超时时间
|
||||||
.setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT)
|
.setConnectionRequestTimeout(configuration.get(HTTP_POOL_REQUEST_TIMEOUT))
|
||||||
// 请求超时时间
|
// 请求超时时间
|
||||||
.setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT)
|
.setConnectTimeout(configuration.get(HTTP_POOL_CONNECT_TIMEOUT))
|
||||||
// 响应超时时间
|
// 响应超时时间
|
||||||
.setSocketTimeout(socketTimeOut)
|
.setSocketTimeout(socketTimeOut)
|
||||||
.build();
|
.build();
|
||||||
@@ -192,13 +202,13 @@ public class HttpClientService {
|
|||||||
// 获取响应信息
|
// 获取响应信息
|
||||||
EntityUtils.consume(response.getEntity());
|
EntityUtils.consume(response.getEntity());
|
||||||
} catch (ClientProtocolException e) {
|
} catch (ClientProtocolException e) {
|
||||||
log.error("current file: {},Protocol error:{}", url, e.getMessage());
|
logger.error("current file: {},Protocol error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
log.error("current file: {}, Parser error:{}", url, e.getMessage());
|
logger.error("current file: {}, Parser error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("current file: {},IO error:{}", url, e.getMessage());
|
logger.error("current file: {},IO error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (null != response) {
|
if (null != response) {
|
||||||
@@ -206,7 +216,7 @@ public class HttpClientService {
|
|||||||
EntityUtils.consume(response.getEntity());
|
EntityUtils.consume(response.getEntity());
|
||||||
response.close();
|
response.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Release Connection error:{}", e.getMessage());
|
logger.error("Release Connection error:{}", e.getMessage());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -236,13 +246,13 @@ public class HttpClientService {
|
|||||||
// 获取响应信息
|
// 获取响应信息
|
||||||
EntityUtils.consume(response.getEntity());
|
EntityUtils.consume(response.getEntity());
|
||||||
} catch (ClientProtocolException e) {
|
} catch (ClientProtocolException e) {
|
||||||
log.error("current file: {},Protocol error:{}", url, e.getMessage());
|
logger.error("current file: {},Protocol error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
log.error("current file: {}, Parser error:{}", url, e.getMessage());
|
logger.error("current file: {}, Parser error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("current file: {},IO error:{}", url, e.getMessage());
|
logger.error("current file: {},IO error:{}", url, e.getMessage());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (null != response) {
|
if (null != response) {
|
||||||
@@ -250,7 +260,7 @@ public class HttpClientService {
|
|||||||
EntityUtils.consume(response.getEntity());
|
EntityUtils.consume(response.getEntity());
|
||||||
response.close();
|
response.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Release Connection error:{}", e.getMessage());
|
logger.error("Release Connection error:{}", e.getMessage());
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -258,4 +268,73 @@ public class HttpClientService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET请求
|
||||||
|
*
|
||||||
|
* @param uri 请求地
|
||||||
|
* @return message
|
||||||
|
*/
|
||||||
|
public String httpGet(URI uri, int socketTimeout, Header... headers) {
|
||||||
|
String msg = ERROR_MESSAGE;
|
||||||
|
|
||||||
|
// 获取客户端连接对象
|
||||||
|
CloseableHttpClient httpClient = getHttpClient(socketTimeout);
|
||||||
|
CloseableHttpResponse response = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.info("http get uri {}", uri);
|
||||||
|
// 创建GET请求对象
|
||||||
|
HttpGet httpGet = new HttpGet(uri);
|
||||||
|
|
||||||
|
if (StringUtil.isNotEmpty(headers)) {
|
||||||
|
for (Header h : headers) {
|
||||||
|
httpGet.addHeader(h);
|
||||||
|
logger.info("request header : {}", h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 执行请求
|
||||||
|
response = httpClient.execute(httpGet);
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
// 获取响应实体
|
||||||
|
HttpEntity entity = response.getEntity();
|
||||||
|
// 获取响应信息
|
||||||
|
msg = EntityUtils.toString(entity, "UTF-8");
|
||||||
|
|
||||||
|
if (statusCode != HttpStatus.SC_OK) {
|
||||||
|
logger.error("Http get content is :{}", msg);
|
||||||
|
}
|
||||||
|
} catch (ClientProtocolException e) {
|
||||||
|
logger.error("协议错误: {}", e.getMessage());
|
||||||
|
} catch (ParseException e) {
|
||||||
|
logger.error("解析错误: {}", e.getMessage());
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("IO错误: {}", e.getMessage());
|
||||||
|
} finally {
|
||||||
|
if (null != response) {
|
||||||
|
try {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
|
response.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("释放链接错误: {}", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUrlWithParams(URIBuilder uriBuilder, String path, Map<String, Object> params) {
|
||||||
|
try {
|
||||||
|
uriBuilder.setPath(path);
|
||||||
|
if (params != null && !params.isEmpty()) {
|
||||||
|
for (Map.Entry<String, Object> kv : params.entrySet()) {
|
||||||
|
uriBuilder.setParameter(kv.getKey(), kv.getValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package com.zdjizhi.utils.connections.kafka;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class KafkaConsumer {
|
||||||
|
|
||||||
|
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, Properties Properties){
|
||||||
|
final FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), Properties);
|
||||||
|
return kafkaConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package com.zdjizhi.utils.connections.kafka;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||||
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class KafkaProducer {
|
||||||
|
|
||||||
|
public static FlinkKafkaProducer<String> getKafkaProducer(String topic, Properties Properties){
|
||||||
|
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
|
||||||
|
topic,
|
||||||
|
new SimpleStringSchema(),
|
||||||
|
Properties,
|
||||||
|
Optional.empty()
|
||||||
|
);
|
||||||
|
kafkaProducer.setLogFailuresOnly(true);
|
||||||
|
|
||||||
|
return kafkaProducer;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,54 +0,0 @@
|
|||||||
package com.zdjizhi.utils.connections.nacos;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.alibaba.nacos.api.NacosFactory;
|
|
||||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
|
||||||
import com.alibaba.nacos.api.config.ConfigService;
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author qidaijie
|
|
||||||
* @Package com.zdjizhi.tools.connections.nacos
|
|
||||||
* @Description:
|
|
||||||
* @date 2023/7/2714:49
|
|
||||||
*/
|
|
||||||
public class NacosConnection {
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
private ConfigService configService;
|
|
||||||
|
|
||||||
|
|
||||||
public ConfigService getDosService() {
|
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
|
|
||||||
properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_DOS_NAMESPACE);
|
|
||||||
properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
|
|
||||||
properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
|
|
||||||
try {
|
|
||||||
configService = NacosFactory.createConfigService(properties);
|
|
||||||
} catch (NacosException e) {
|
|
||||||
logger.error("NacosException:{}", e);
|
|
||||||
}
|
|
||||||
return configService;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public ConfigService getPublicService() {
|
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
|
|
||||||
properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_PUBLIC_NAMESPACE);
|
|
||||||
properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
|
|
||||||
properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
|
|
||||||
try {
|
|
||||||
configService = NacosFactory.createConfigService(properties);
|
|
||||||
} catch (NacosException e) {
|
|
||||||
logger.error("NacosException:{}", e);
|
|
||||||
}
|
|
||||||
return configService;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,69 +0,0 @@
|
|||||||
package com.zdjizhi.utils.connections.nacos;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.alibaba.nacos.api.config.ConfigService;
|
|
||||||
import com.alibaba.nacos.api.config.listener.Listener;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.StringReader;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
public class NacosUtils {
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
private static Properties commonProperties = new Properties();
|
|
||||||
|
|
||||||
static {
|
|
||||||
NacosConnection nacosConnection = new NacosConnection();
|
|
||||||
ConfigService dosService = nacosConnection.getDosService();
|
|
||||||
try {
|
|
||||||
String config = dosService.getConfig(FlowWriteConfig.NACOS_DOS_DATA_ID, FlowWriteConfig.NACOS_DOS_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT);
|
|
||||||
|
|
||||||
commonProperties.load(new StringReader(config));
|
|
||||||
|
|
||||||
dosService.addListener(FlowWriteConfig.NACOS_DOS_DATA_ID, FlowWriteConfig.NACOS_DOS_GROUP, new Listener() {
|
|
||||||
@Override
|
|
||||||
public Executor getExecutor() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void receiveConfigInfo(String configMsg) {
|
|
||||||
try {
|
|
||||||
commonProperties.clear();
|
|
||||||
commonProperties.load(new StringReader(configMsg));
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("监听nacos配置失败", e);
|
|
||||||
}
|
|
||||||
System.out.println(configMsg);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
logger.error("获取nacos配置失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getStringProperty(String key) {
|
|
||||||
return commonProperties.getProperty(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Integer getIntProperty(String key) {
|
|
||||||
return Integer.parseInt(commonProperties.getProperty(key));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Double getDoubleProperty(String key) {
|
|
||||||
return Double.parseDouble(commonProperties.getProperty(key));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Long getLongProperty(String key) {
|
|
||||||
return Long.parseLong(commonProperties.getProperty(key));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Boolean getBooleanProperty(String key) {
|
|
||||||
return "true".equals(commonProperties.getProperty(key).toLowerCase().trim());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,8 +1,6 @@
|
|||||||
package com.zdjizhi.utils.exception;
|
package com.zdjizhi.utils.exception;
|
||||||
|
|
||||||
|
|
||||||
public class FlowWriteException extends RuntimeException {
|
public class FlowWriteException extends RuntimeException {
|
||||||
|
|
||||||
public FlowWriteException() {
|
public FlowWriteException() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
180
src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java
Normal file
180
src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java
Normal file
@@ -0,0 +1,180 @@
|
|||||||
|
package com.zdjizhi.utils.knowledgebase;
|
||||||
|
|
||||||
|
import cn.hutool.crypto.digest.DigestUtil;
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.alibaba.fastjson2.*;
|
||||||
|
import com.geedgenetworks.utils.IpLookupV2;
|
||||||
|
import com.geedgenetworks.utils.StringUtil;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
|
||||||
|
import com.zdjizhi.utils.connections.http.HttpClientService;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static com.zdjizhi.conf.DosConfigs.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author wangchengcheng
|
||||||
|
* @version 2023/11/10 15:23
|
||||||
|
*/
|
||||||
|
public class IpLookupUtils {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
private final String ipBuiltInName = "ip_builtin.mmdb";
|
||||||
|
private final String ipUserDefinedName = "ip_user_defined.mmdb";
|
||||||
|
|
||||||
|
private Configuration configuration;
|
||||||
|
/**
|
||||||
|
* ip定位库
|
||||||
|
*/
|
||||||
|
private static IpLookupV2 ipLookup;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定位库默认分隔符
|
||||||
|
*/
|
||||||
|
private static final String LOCATION_SEPARATOR = ".";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 最大重试次数
|
||||||
|
*/
|
||||||
|
private static final int TRY_TIMES = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* http connections
|
||||||
|
*/
|
||||||
|
private final HttpClientService httpClientService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 定位库元数据缓存
|
||||||
|
*/
|
||||||
|
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
|
||||||
|
|
||||||
|
private static String currentSha256IpUserDefined = "";
|
||||||
|
|
||||||
|
private static String currentSha256IpBuiltin = "";
|
||||||
|
|
||||||
|
public IpLookupUtils(Configuration configuration, HttpClientService httpClientService) {
|
||||||
|
this.configuration = configuration;
|
||||||
|
this.httpClientService = httpClientService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stuffKnowledgeMetaCache() {
|
||||||
|
KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_BUILTIN_KD_ID));
|
||||||
|
|
||||||
|
if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
|
||||||
|
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
|
||||||
|
knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
|
||||||
|
}
|
||||||
|
final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_USER_DEFINED_KD_ID));
|
||||||
|
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
|
||||||
|
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
|
||||||
|
knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta);
|
||||||
|
}
|
||||||
|
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
|
||||||
|
currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256();
|
||||||
|
currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256();
|
||||||
|
reloadIpLookup();
|
||||||
|
logger.debug("定位库信息重新加载成功,当前ip_builtin.mmdb的Sha256编码为:" + currentSha256IpBuiltin, "ip_user_defined.mmdb的Sha256编码为" + currentSha256IpUserDefined);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从HDFS下载文件更新IpLookup
|
||||||
|
*/
|
||||||
|
private void reloadIpLookup() {
|
||||||
|
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
|
||||||
|
for (String fileName : knowledgeMetaCache.keySet()) {
|
||||||
|
int retryNum = 0;
|
||||||
|
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
|
||||||
|
String metaSha256 = knowlegeBaseMeta.getSha256();
|
||||||
|
while (retryNum < TRY_TIMES) {
|
||||||
|
System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
|
||||||
|
Long startTime = System.currentTimeMillis();
|
||||||
|
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), configuration.get(HTTP_SOCKET_TIMEOUT));
|
||||||
|
if (httpGetByte != null && httpGetByte.length > 0) {
|
||||||
|
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
|
||||||
|
if (metaSha256.equals(downloadFileSha256)) {
|
||||||
|
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
|
||||||
|
switch (fileName) {
|
||||||
|
case ipBuiltInName:
|
||||||
|
builder.loadDataFile(inputStream);
|
||||||
|
break;
|
||||||
|
case ipUserDefinedName:
|
||||||
|
builder.loadDataFilePrivate(inputStream);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
|
||||||
|
retryNum = TRY_TIMES;
|
||||||
|
} else {
|
||||||
|
logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
|
||||||
|
retryNum++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
|
||||||
|
retryNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ipLookup = builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据配置组合生成知识库元数据过滤参数
|
||||||
|
*
|
||||||
|
* @return 过滤参数
|
||||||
|
*/
|
||||||
|
private String getFilterParameter() {
|
||||||
|
String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]";
|
||||||
|
return expr;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) {
|
||||||
|
KnowlegeBaseMeta knowlegeBaseMeta = null;
|
||||||
|
String knowledgeInfo = null;
|
||||||
|
try {
|
||||||
|
URIBuilder uriBuilder = new URIBuilder(configuration.get(KNOWLEDGE_BASE_URL));
|
||||||
|
HashMap<String, Object> parms = new HashMap<>();
|
||||||
|
parms.put("kb_id", kd_id);
|
||||||
|
httpClientService.setUrlWithParams(uriBuilder, configuration.get(KNOWLEDGE_BASE_PATH), parms);
|
||||||
|
knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), configuration.get(HTTP_SOCKET_TIMEOUT));
|
||||||
|
if (knowledgeInfo.contains("200")) {
|
||||||
|
final Map<String, Object> jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class);
|
||||||
|
logger.debug("获取kd_id为[" + kd_id + "]的knowledge_base成功,响应信息为" + jsonObject);
|
||||||
|
JSONPath jsonPath = JSONPath.of(getFilterParameter());
|
||||||
|
String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString();
|
||||||
|
if (StringUtil.isNotBlank(extract)) {
|
||||||
|
JSONArray jsonArray = JSON.parseArray(extract);
|
||||||
|
if (jsonArray.size() > 0) {
|
||||||
|
for (int i = 0; i < jsonArray.size(); i++) {
|
||||||
|
knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo);
|
||||||
|
}
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
logger.error("构造URI异常", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("获取knowledge_base失败", e);
|
||||||
|
}
|
||||||
|
return knowlegeBaseMeta;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCountryLookup(String ip) {
|
||||||
|
if (ipLookup != null) {
|
||||||
|
return ipLookup.countryLookup(ip);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,139 +0,0 @@
|
|||||||
#flink运行环境并行度,其优先级低于算子并行度,如果未设置算子并行度,则使用该数值
|
|
||||||
stream.execution.environment.parallelism=1
|
|
||||||
|
|
||||||
#flink任务名,一般不变
|
|
||||||
stream.execution.job.name=DOS-DETECTION-APPLICATION
|
|
||||||
|
|
||||||
#输入kafka并行度大小
|
|
||||||
kafka.input.parallelism=3
|
|
||||||
|
|
||||||
#输入kafka topic名
|
|
||||||
kafka.input.topic.name=test
|
|
||||||
|
|
||||||
#输入kafka地址
|
|
||||||
kafka.input.bootstrap.servers=192.168.44.12:9094
|
|
||||||
#kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
|
||||||
|
|
||||||
#读取kafka group id
|
|
||||||
kafka.input.group.id=dos-detection-job-221125-23132
|
|
||||||
#kafka.input.group.id=dos-detection-job-210813-1
|
|
||||||
|
|
||||||
#发送kafka metrics并行度大小
|
|
||||||
kafka.output.metric.parallelism=3
|
|
||||||
|
|
||||||
#发送kafka metrics topic名
|
|
||||||
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
|
|
||||||
kafka.output.metric.topic.name=test
|
|
||||||
|
|
||||||
#发送kafka event并行度大小
|
|
||||||
kafka.output.event.parallelism=3
|
|
||||||
|
|
||||||
#发送kafka event topic名
|
|
||||||
#kafka.output.event.topic.name=DOS-EVENT
|
|
||||||
kafka.output.event.topic.name=dos-test
|
|
||||||
|
|
||||||
#kafka输出地址
|
|
||||||
kafka.output.bootstrap.servers=192.168.44.12:9094
|
|
||||||
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
|
||||||
|
|
||||||
#zookeeper地址
|
|
||||||
hbase.zookeeper.quorum=192.168.44.12:2181
|
|
||||||
#hbase.zookeeper.quorum=192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181
|
|
||||||
#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
|
|
||||||
|
|
||||||
#hbase客户端处理时间
|
|
||||||
hbase.client.operation.timeout=30000
|
|
||||||
hbase.client.scanner.timeout.period=30000
|
|
||||||
|
|
||||||
##hbase baseline表名
|
|
||||||
hbase.baseline.table.name=dos:ddos_traffic_baselines
|
|
||||||
|
|
||||||
#读取baseline限制
|
|
||||||
hbase.baseline.total.num=1000000
|
|
||||||
|
|
||||||
#baseline ttl,单位:天
|
|
||||||
hbase.baseline.ttl=10
|
|
||||||
|
|
||||||
#设置聚合并行度,2个key
|
|
||||||
flink.first.agg.parallelism=1
|
|
||||||
|
|
||||||
#设置结果判定并行度
|
|
||||||
flink.detection.map.parallelism=1
|
|
||||||
|
|
||||||
#watermark延迟
|
|
||||||
flink.watermark.max.orderness=300
|
|
||||||
|
|
||||||
#计算窗口大小,默认600s
|
|
||||||
flink.window.max.time=60
|
|
||||||
|
|
||||||
#dos event结果中distinct source IP限制
|
|
||||||
source.ip.list.limit=10000
|
|
||||||
|
|
||||||
#基于目的IP的分区数,默认为10000,一般不变
|
|
||||||
destination.ip.partition.num=10000
|
|
||||||
|
|
||||||
data.center.id.num=15
|
|
||||||
|
|
||||||
|
|
||||||
#bifang服务访问地址
|
|
||||||
bifang.server.uri=http://192.168.44.72
|
|
||||||
#bifang.server.uri=http://192.168.44.3:80
|
|
||||||
|
|
||||||
#加密密码路径信息
|
|
||||||
bifang.server.encryptpwd.path=/v1/user/encryptpwd
|
|
||||||
|
|
||||||
#登录bifang服务路径信息
|
|
||||||
bifang.server.login.path=/v1/user/login
|
|
||||||
|
|
||||||
#获取vaysId路径信息
|
|
||||||
bifang.server.policy.vaysid.path=/v1/admin/vsys
|
|
||||||
|
|
||||||
#获取静态阈值路径信息
|
|
||||||
bifang.server.policy.threshold.path=/v1/policy/profile/dos_detection
|
|
||||||
|
|
||||||
#http请求相关参数
|
|
||||||
#最大连接数
|
|
||||||
http.pool.max.connection=400
|
|
||||||
|
|
||||||
#单路由最大连接数
|
|
||||||
http.pool.max.per.route=80
|
|
||||||
|
|
||||||
#向服务端请求超时时间设置(单位:毫秒)
|
|
||||||
http.pool.request.timeout=60000
|
|
||||||
|
|
||||||
#向服务端连接超时时间设置(单位:毫秒)
|
|
||||||
http.pool.connect.timeout=60000
|
|
||||||
|
|
||||||
#服务端响应超时时间设置(单位:毫秒)
|
|
||||||
http.pool.response.timeout=60000
|
|
||||||
|
|
||||||
#获取静态阈值周期,默认十分钟
|
|
||||||
static.threshold.schedule.minutes=10
|
|
||||||
|
|
||||||
#获取baseline周期,默认7天
|
|
||||||
baseline.threshold.schedule.days=1
|
|
||||||
|
|
||||||
#kafka用户认证配置参数
|
|
||||||
sasl.jaas.config.user=admin
|
|
||||||
#sasl.jaas.config.password=galaxy2019
|
|
||||||
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
|
||||||
|
|
||||||
#是否开启kafka用户认证配置,1:是;0:否
|
|
||||||
sasl.jaas.config.flag=1
|
|
||||||
|
|
||||||
############################## Nacos 配置 ######################################
|
|
||||||
nacos.server.addr=192.168.44.12:8848
|
|
||||||
nacos.username=nacos
|
|
||||||
nacos.password=nacos
|
|
||||||
############################## Nacos ---知识库配置 ######################################
|
|
||||||
nacos.namespace=public
|
|
||||||
nacos.data.id=knowledge_base.json
|
|
||||||
nacos.group=DEFAULT_GROUP
|
|
||||||
nacos.connection.timeout=60000
|
|
||||||
|
|
||||||
############################## Nacos ---静态阈值配置 ######################################
|
|
||||||
nacos.dos.namespace=test
|
|
||||||
nacos.dos.data.id=dos_detection.properties
|
|
||||||
nacos.dos.group=Galaxy
|
|
||||||
|
|
||||||
http.socket.timeout=90000
|
|
||||||
51
src/main/resources/detection_dos_attack.properties
Normal file
51
src/main/resources/detection_dos_attack.properties
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
source.kafka.topic=DOS-SKETCH-RECORD
|
||||||
|
source.kafka.props.bootstrap.servers=192.168.44.12:9094
|
||||||
|
source.kafka.props.group.id=dos-detection-job-20240116
|
||||||
|
source.kafka.props.session.timeout.ms=60000
|
||||||
|
source.kafka.props.max.poll.records=5000
|
||||||
|
source.kafka.props.max.partition.fetch.bytes=31457280
|
||||||
|
source.kafka.props.security.protocol=SASL_PLAINTEXT
|
||||||
|
source.kafka.props.sasl.mechanism=PLAIN
|
||||||
|
source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="galaxy2019";
|
||||||
|
|
||||||
|
#kafka sink
|
||||||
|
kafka.sink.event.topic.name=DOS-EVENT
|
||||||
|
kafka.sink.metric.topic=TRAFFIC-TOP-DESTINATION-IP-METRIC
|
||||||
|
sink.kafka.props.bootstrap.servers=192.168.44.12:9094
|
||||||
|
sink.kafka.props.security.protocol=SASL_PLAINTEXT
|
||||||
|
sink.kafka.props.sasl.mechanism=PLAIN
|
||||||
|
sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
|
||||||
|
sink.kafka.props.acks=1
|
||||||
|
sink.kafka.props.retries=0
|
||||||
|
sink.kafka.props.linger.ms=10
|
||||||
|
sink.kafka.props.request.timeout.ms=30000
|
||||||
|
sink.kafka.props.batch.size=262144
|
||||||
|
sink.kafka.props.buffer.memory=134217728
|
||||||
|
sink.kafka.props.max.request.size=10485760
|
||||||
|
sink.kafka.props.compression.type=snappy
|
||||||
|
|
||||||
|
|
||||||
|
ip.user.defined.kd.id=dasdasdsad
|
||||||
|
|
||||||
|
#zookeeper地址
|
||||||
|
hbase.zookeeper.quorum=192.168.44.12:2181
|
||||||
|
|
||||||
|
flink.watermark.max.orderness=30
|
||||||
|
|
||||||
|
#计算窗口大小,默认600s
|
||||||
|
flink.window.max.time=60
|
||||||
|
|
||||||
|
#cm服务访问地址
|
||||||
|
bifang.server.uri=http://192.168.44.3
|
||||||
|
|
||||||
|
knowledge.base.uri=http://192.168.45.102:9999
|
||||||
|
############################## 阈值 配置 ######################################
|
||||||
|
static.sensitivity.threshold=1
|
||||||
|
#基线敏感阈值
|
||||||
|
baseline.sensitivity.threshold=0.2
|
||||||
|
#基于baseline判定dos攻击的上下限
|
||||||
|
baseline.sessions.minor.threshold=0.2
|
||||||
|
baseline.sessions.warning.threshold=1
|
||||||
|
baseline.sessions.major.threshold=2.5
|
||||||
|
baseline.sessions.severe.threshold=5
|
||||||
|
baseline.sessions.critical.threshold=8
|
||||||
@@ -1,50 +0,0 @@
|
|||||||
package com.zdjizhi.Http;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.utils.HttpClientUtils;
|
|
||||||
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.HttpURLConnection;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.util.HashMap;
|
|
||||||
|
|
||||||
public class HttpTest {
|
|
||||||
public static void main(String[] args) {
|
|
||||||
String token = HttpClientUtils.ERROR_MESSAGE;
|
|
||||||
try {
|
|
||||||
|
|
||||||
String urlString = FlowWriteConfig.BIFANG_SERVER_URI+"/v1/user/encryptpwd";
|
|
||||||
final HashMap<String, Object> parmsMap = new HashMap<>();
|
|
||||||
parmsMap.put("username","admin");
|
|
||||||
|
|
||||||
final String jsonInputString = JSON.toJSONString(parmsMap);
|
|
||||||
System.out.println("URL:"+urlString);
|
|
||||||
System.out.println("parmsString:"+jsonInputString);
|
|
||||||
|
|
||||||
|
|
||||||
final URL url = new URL(urlString);
|
|
||||||
|
|
||||||
final HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
|
||||||
|
|
||||||
connection.setRequestMethod("POST");
|
|
||||||
|
|
||||||
connection.setRequestProperty("Content-Type", "application/json");
|
|
||||||
connection.setRequestProperty("Accept", "application/json");
|
|
||||||
connection.setDoOutput(true);
|
|
||||||
OutputStream os = connection.getOutputStream();
|
|
||||||
os.write(jsonInputString.getBytes());
|
|
||||||
os.flush();
|
|
||||||
os.close();
|
|
||||||
|
|
||||||
|
|
||||||
int responseCode = connection.getResponseCode();
|
|
||||||
System.out.println("Response Code: " + responseCode);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.out.println("失败");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -19,12 +19,12 @@ public class HbaseTest {
|
|||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
|
||||||
|
|
||||||
config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM);
|
// config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM);
|
||||||
config.set("hbase.client.retries.number", "3");
|
// config.set("hbase.client.retries.number", "3");
|
||||||
config.set("hbase.bulkload.retries.number", "3");
|
// config.set("hbase.bulkload.retries.number", "3");
|
||||||
config.set("zookeeper.recovery.retry", "3");
|
// config.set("zookeeper.recovery.retry", "3");
|
||||||
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
// config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||||
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
// config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||||
|
|
||||||
TableName tableName = TableName.valueOf("dos_test");
|
TableName tableName = TableName.valueOf("dos_test");
|
||||||
Connection conn = ConnectionFactory.createConnection(config);
|
Connection conn = ConnectionFactory.createConnection(config);
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ public class NacosTest {
|
|||||||
String content = configService.getConfig(DATA_ID, GROUP, 5000);
|
String content = configService.getConfig(DATA_ID, GROUP, 5000);
|
||||||
Properties nacosConfigMap = new Properties();
|
Properties nacosConfigMap = new Properties();
|
||||||
nacosConfigMap.load(new StringReader(content));
|
nacosConfigMap.load(new StringReader(content));
|
||||||
System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
|
// System.out.println(FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ import com.zdjizhi.common.DosDetectionThreshold;
|
|||||||
import com.zdjizhi.common.DosEventLog;
|
import com.zdjizhi.common.DosEventLog;
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
|
|
||||||
import com.zdjizhi.utils.IpLookupUtils;
|
|
||||||
import com.zdjizhi.utils.SnowflakeId;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -34,13 +32,13 @@ public class DosDetectionTest {
|
|||||||
serverIpList.add("192.168.50.1/24");
|
serverIpList.add("192.168.50.1/24");
|
||||||
serverIpList.add("FC::12:0:0/54");
|
serverIpList.add("FC::12:0:0/54");
|
||||||
serverIpList.add("FC::12:0:0");
|
serverIpList.add("FC::12:0:0");
|
||||||
dosDetectionThreshold.setProfile_id(4437);
|
dosDetectionThreshold.setId(4437);
|
||||||
dosDetectionThreshold.setAttack_type("DNS Flood");
|
dosDetectionThreshold.setAttack_type("DNS Flood");
|
||||||
dosDetectionThreshold.setServer_ip_list(serverIpList);
|
dosDetectionThreshold.setServer_ip_list(serverIpList);
|
||||||
dosDetectionThreshold.setSessions_per_sec(1);
|
dosDetectionThreshold.setSessions_per_sec(1);
|
||||||
dosDetectionThreshold.setPackets_per_sec(1);
|
dosDetectionThreshold.setPackets_per_sec(1);
|
||||||
dosDetectionThreshold.setBits_per_sec(100000);
|
dosDetectionThreshold.setBits_per_sec(100000);
|
||||||
dosDetectionThreshold.setIs_valid(1);
|
dosDetectionThreshold.setIs_enabled(1);
|
||||||
dosDetectionThreshold.setSuperior_ids(new Integer[]{5,4,12,27});
|
dosDetectionThreshold.setSuperior_ids(new Integer[]{5,4,12,27});
|
||||||
|
|
||||||
|
|
||||||
@@ -69,15 +67,15 @@ public class DosDetectionTest {
|
|||||||
long profileId = 0;
|
long profileId = 0;
|
||||||
DosEventLog result =null;
|
DosEventLog result =null;
|
||||||
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
|
if (diffSessionPercent >= diffPktPercent && diffSessionPercent >= diffBitPercent){
|
||||||
profileId = dosDetectionThreshold.getProfile_id();
|
profileId = dosDetectionThreshold.getId();
|
||||||
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
result= getDosEventLog(dosSketchLog, sessionBase, diffSession, profileId, STATIC_CONDITION_TYPE, SESSIONS_TAG);
|
||||||
System.out.println(result);
|
System.out.println(result);
|
||||||
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
|
}else if (diffPktPercent >= diffSessionPercent && diffPktPercent >= diffBitPercent){
|
||||||
profileId = dosDetectionThreshold.getProfile_id();
|
profileId = dosDetectionThreshold.getId();
|
||||||
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
result = getDosEventLog(dosSketchLog, pktBase, diffPkt,profileId, STATIC_CONDITION_TYPE, PACKETS_TAG);
|
||||||
System.out.println(result);
|
System.out.println(result);
|
||||||
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
|
}else if (diffBitPercent >= diffPktPercent && diffBitPercent >= diffSessionPercent){
|
||||||
profileId = dosDetectionThreshold.getProfile_id();
|
profileId = dosDetectionThreshold.getId();
|
||||||
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
result = getDosEventLog(dosSketchLog, bitBase, diffByte, profileId, STATIC_CONDITION_TYPE, BITS_TAG);
|
||||||
System.out.println(result);
|
System.out.println(result);
|
||||||
}
|
}
|
||||||
@@ -174,10 +172,10 @@ public class DosDetectionTest {
|
|||||||
String[] ipArr = sourceIpList.split(",");
|
String[] ipArr = sourceIpList.split(",");
|
||||||
HashSet<String> countrySet = new HashSet<>();
|
HashSet<String> countrySet = new HashSet<>();
|
||||||
for (String ip : ipArr) {
|
for (String ip : ipArr) {
|
||||||
String country = IpLookupUtils.getCountryLookup(ip);
|
// String country = IpLookupUtils.getCountryLookup(ip);
|
||||||
if (StringUtil.isNotBlank(country)){
|
// if (StringUtil.isNotBlank(country)){
|
||||||
countrySet.add(country);
|
// countrySet.add(country);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
countryList = StringUtils.join(countrySet, ", ");
|
countryList = StringUtils.join(countrySet, ", ");
|
||||||
return countryList;
|
return countryList;
|
||||||
|
|||||||
Reference in New Issue
Block a user