新增敏感阈值,过滤告警信息

修改计算平均值方式,先聚合再平均
This commit is contained in:
wanglihui
2021-09-09 09:52:19 +08:00
parent b95ca44c01
commit ca19b6c0e6
11 changed files with 67 additions and 81 deletions

View File

@@ -19,6 +19,8 @@ public class CommonConfig {
public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name");
public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism");
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
public static final String KAFKA_OUTPUT_SKETCH_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.sketch.topic.name");
public static final int KAFKA_OUTPUT_SKETCH_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.sketch.parallelism");
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
@@ -39,6 +41,8 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold");
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");

View File

@@ -53,10 +53,11 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
logger.debug("当前判断IP{}, 类型: {}", destinationIp, attackType);
if (baselineMap.containsKey(destinationIp)) {
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD && baselineMap.containsKey(destinationIp)) {
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(floodTypeTup, value);
long diff = value.getSketch_sessions() - base;
long diff = sketchSessions - base;
if (diff > 0 && base != 0) {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);

View File

@@ -80,7 +80,7 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
}
}
String sourceIpList = StringUtils.join(sourceIpSet, ",");
return Tuple6.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList,startTime,duration);
return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
}catch (Exception e){
logger.error("聚合中间结果集失败 {}",e);
}

View File

@@ -60,9 +60,9 @@ public class ParseSketchLog {
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
dosSketchLog.setSource_ip(sourceIp);
dosSketchLog.setDestination_ip(destinationIp);
dosSketchLog.setSketch_sessions(sketchSessions/sketchDuration);
dosSketchLog.setSketch_packets(sketchPackets/sketchDuration);
dosSketchLog.setSketch_bytes(sketchBytes*8/sketchDuration);
dosSketchLog.setSketch_sessions(sketchSessions);
dosSketchLog.setSketch_packets(sketchPackets);
dosSketchLog.setSketch_bytes(sketchBytes);
collector.collect(dosSketchLog);
logger.debug("数据解析成功:{}",dosSketchLog.toString());
}

View File

@@ -0,0 +1,21 @@
package com.zdjizhi.sink;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosSketchLog;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.KafkaUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.Objects;
class DosSketchSink {
static void dosSketchOutputSink(SingleOutputStreamOperator<DosSketchLog> sketchSource){
sketchSource
.filter(Objects::nonNull)
.map(JsonMapper::toJsonString)
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_SKETCH_TOPIC_NAME))
.setParallelism(CommonConfig.KAFKA_OUTPUT_SKETCH_PARALLELISM);
}
}

View File

@@ -27,6 +27,9 @@ public class OutputStreamSink {
public static void finalOutputSink(){
try {
SingleOutputStreamOperator<DosSketchLog> sketchSource = ParseSketchLog.getSketchSource();
DosSketchSink.dosSketchOutputSink(sketchSource);
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);

View File

@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@@ -12,16 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
public static StreamTableEnvironment getStreamTableEnv() {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
return StreamTableEnvironment.create(streamExeEnv, settings);
}

View File

@@ -11,6 +11,9 @@ public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
Properties propertiesproducer = new Properties();
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
// propertiesproducer.setProperty("security.protocol", "SASL_PLAINTEXT");
// propertiesproducer.setProperty("sasl.mechanism", "PLAIN");
// propertiesproducer.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";");
return propertiesproducer;
}