From 1fcdb79739c396727207becb24358221edcc94f7 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Tue, 17 May 2022 13:54:43 +0800 Subject: [PATCH] =?UTF-8?q?DoS=E4=BA=8B=E4=BB=B6=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=AF=B9Conditions=E5=9F=BA=E4=BA=8E=E9=80=9F=E7=8E=87?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E5=B1=9E=E6=80=A7=E5=80=BC=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/etl/DosDetection.java | 2 +- .../zdjizhi/utils/FlinkEnvironmentUtils.java | 29 +++++++++++++++++++ .../java/com/zdjizhi/utils/KafkaUtils.java | 4 ++- src/main/resources/common.properties | 14 ++++----- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 1f5568d..af5bafb 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction { switch (type) { case STATIC_CONDITION_TYPE: return new StrBuilder() - .append(tag).append(" > ") + .append("Rate > ") .append(base).append(" ") .append(tag).append("/s") .toString(); diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java index e34ce28..cd628c5 100644 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java @@ -1,6 +1,8 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils { static { streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + + /* + // 每 1000ms 开始一次 checkpoint + streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000); + + // 设置模式为精确一次 (这是默认值) + streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + + // 确认 checkpoints 之间的时间会进行 500 ms + streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + + // Checkpoint 必须在一分钟内完成,否则就会被抛弃 + streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000); + + // 允许两个连续的 checkpoint 错误 + streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); + + // 同一时间只允许一个 checkpoint 进行 + streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 + streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + // 开启实验性的 unaligned checkpoints + streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints(); + */ } } diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index 6e6167a..b0312a5 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -22,12 +22,14 @@ public class KafkaUtils { } public static FlinkKafkaProducer getKafkaSink(String topic){ - return new FlinkKafkaProducer( + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( topic, new SimpleStringSchema(), getKafkaSinkProperty(), Optional.empty() ); + kafkaProducer.setLogFailuresOnly(true); + return kafkaProducer; } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 1331475..92d0520 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id -kafka.input.group.id=2112080949 +kafka.input.group.id=dos-detection-job-220516-1 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 @@ -37,8 +37,8 @@ kafka.output.bootstrap.servers=192.168.44.12:9094 #kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #zookeeper地址 -hbase.zookeeper.quorum=192.168.44.12:2181 -#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 +#hbase.zookeeper.quorum=192.168.44.12:2181 +hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 #hbase客户端处理时间 hbase.client.operation.timeout=30000 @@ -74,12 +74,12 @@ destination.ip.partition.num=10000 data.center.id.num=15 #IP mmdb库路径 -ip.mmdb.path=D:\\data\\dat\\ +ip.mmdb.path=D:\\data\\dat\\bak\\ #ip.mmdb.path=/home/bigdata/topology/dat/ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #静态敏感阈值,速率小于此值不报警 -static.sensitivity.threshold=500 +static.sensitivity.threshold=10 #基线敏感阈值 baseline.sensitivity.threshold=0.2 @@ -92,8 +92,8 @@ baseline.sessions.severe.threshold=5 baseline.sessions.critical.threshold=8 #bifang服务访问地址 -#bifang.server.uri=http://192.168.44.72:80 -bifang.server.uri=http://192.168.44.3:80 +bifang.server.uri=http://192.168.44.72:80 +#bifang.server.uri=http://192.168.44.3:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867