diff --git a/pom.xml b/pom.xml
index 9bf8d43..b7c3647 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,13 +114,6 @@
1.7.21
-
- org.apache.flink
- flink-sql-connector-kafka_2.11
- ${flink.version}
- provided
-
-
org.apache.flink
flink-connector-kafka_2.11
@@ -128,36 +121,6 @@
-
-
- org.apache.kafka
- kafka-clients
- 1.0.0
-
-
-
-
- org.apache.flink
- flink-table-api-java
- ${flink.version}
-
-
-
-
-
- org.apache.flink
- flink-table-planner-blink_2.11
- ${flink.version}
-
-
-
-
- org.apache.flink
- flink-table-planner_2.11
- ${flink.version}
-
-
-
org.apache.flink
@@ -181,6 +144,23 @@
+
+
+ org.apache.hbase
+ hbase-client
+ 2.2.3
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
org.apache.zookeeper
zookeeper
@@ -203,12 +183,6 @@
4.5.6
-
- org.apache.flink
- flink-connector-hbase-2.2_2.11
- ${flink.version}
-
-
cn.hutool
hutool-all
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index aa64f95..c4603ff 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -19,6 +19,8 @@ public class CommonConfig {
public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name");
public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism");
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
+ public static final String KAFKA_OUTPUT_SKETCH_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.sketch.topic.name");
+ public static final int KAFKA_OUTPUT_SKETCH_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.sketch.parallelism");
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
@@ -39,6 +41,8 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
+ public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold");
+
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index 22dc76d..d01c9c2 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -53,10 +53,11 @@ public class DosDetection extends RichMapFunction {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
- if (baselineMap.containsKey(destinationIp)) {
+ long sketchSessions = value.getSketch_sessions();
+ if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD && baselineMap.containsKey(destinationIp)) {
Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
- long diff = value.getSketch_sessions() - base;
+ long diff = sketchSessions - base;
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
index f43a420..464b5f6 100644
--- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java
@@ -80,7 +80,7 @@ public class EtlProcessFunction extends ProcessWindowFunction sketchSource){
+ sketchSource
+ .filter(Objects::nonNull)
+ .map(JsonMapper::toJsonString)
+ .addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_SKETCH_TOPIC_NAME))
+ .setParallelism(CommonConfig.KAFKA_OUTPUT_SKETCH_PARALLELISM);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
index 6bac8c8..5789571 100644
--- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
+++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
@@ -27,6 +27,9 @@ public class OutputStreamSink {
public static void finalOutputSink(){
try {
+ SingleOutputStreamOperator sketchSource = ParseSketchLog.getSketchSource();
+ DosSketchSink.dosSketchOutputSink(sketchSource);
+
SingleOutputStreamOperator middleStream = getMiddleStream();
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
index 6e8e02c..716c7c7 100644
--- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
+++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@@ -12,16 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
- public static StreamTableEnvironment getStreamTableEnv() {
+ static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
-
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build();
-
- return StreamTableEnvironment.create(streamExeEnv, settings);
}
diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
index 954a406..3c7e67a 100644
--- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java
+++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
@@ -11,6 +11,9 @@ public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties propertiesproducer = new Properties();
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
+// propertiesproducer.setProperty("security.protocol", "SASL_PLAINTEXT");
+// propertiesproducer.setProperty("sasl.mechanism", "PLAIN");
+// propertiesproducer.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";");
return propertiesproducer;
}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index bee457b..072c787 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-LOG
kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#读取kafka group id
-kafka.input.group.id=2108161121
+kafka.input.group.id=2109061121
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
@@ -32,8 +32,13 @@ kafka.output.event.parallelism=1
#kafka.output.event.topic.name=DOS-EVENT-LOG
kafka.output.event.topic.name=test
+#sketch日志 topic名以及并行度
+#kafka.output.sketch.topic.name=FLATTEN-DOS-SKETCH-LOG
+kafka.output.sketch.topic.name=test
+kafka.output.sketch.parallelism=1
+
#kafka输出地址
-kafka.output.bootstrap.servers=192.168.44.12:9092
+kafka.output.bootstrap.servers=192.168.44.33:9092
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
@@ -72,8 +77,10 @@ data.center.id.num=15
#IP mmdb库路径
ip.mmdb.path=D:\\data\\dat\\
-#ip.mmdb.path=/home/bigdata/topology/dat/
-#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
+#ip.mmdb.path=ip.mmdb.path=/home/ceiec/topology/dat/
+
+#敏感阈值,速率小于此值不报警
+sensitivity.threshold=100
#基于baseline判定dos攻击的上下限
baseline.sessions.minor.threshold=0.1
diff --git a/src/test/java/com/zdjizhi/common/UdtfTest.java b/src/test/java/com/zdjizhi/common/UdtfTest.java
deleted file mode 100644
index 479febe..0000000
--- a/src/test/java/com/zdjizhi/common/UdtfTest.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
-
-public class UdtfTest extends TableFunction {
-
- public void eval(Row[] rows) {
- for (Row row : rows) {
- collect(row);
- }
- }
-
- public static void main(String[] args) {
-
- }
-}