diff --git a/README.md b/README.md new file mode 100644 index 0000000..f365dbb --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# flink-dos-detection +## 简介 +解析并聚合功能端发出的疑似DDoS攻击日志,基于静态阈值与基线判断是否为DoS攻击。 + +###输入: +DosSketchLog,功能端发出的TopN日志 +###输出: +DosMetricsLog: 目标IP流量统计日志; +DosEventLog: Dos事件日志,即最终检查结果 + diff --git a/pom.xml b/pom.xml index 94db8b2..ea52841 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,9 @@ *:* META-INF + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA @@ -125,13 +128,13 @@ - - - org.apache.flink - flink-table - ${flink.version} - - + + + + + + + @@ -187,16 +190,13 @@ zookeeper org.apache.zookeeper + + jdk.tools + jdk.tools + - - - - - - - org.apache.zookeeper zookeeper @@ -245,8 +245,6 @@ - - diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 81fd3b6..0985a11 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -8,6 +8,7 @@ import com.zdjizhi.utils.CommonConfigurations; public class CommonConfig { public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = CommonConfigurations.getIntProperty("stream.execution.environment.parallelism"); + public static final String STREAM_EXECUTION_JOB_NAME = CommonConfigurations.getStringProperty("stream.execution.job.name"); public static final int KAFKA_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.input.parallelism"); public static final String KAFKA_INPUT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.input.topic.name"); @@ -22,12 +23,10 @@ public class CommonConfig { public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers"); public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum"); - public static final String HBASE_ZOOKEEPER_CLIENT_PORT = CommonConfigurations.getStringProperty("hbase.zookeeper.client.port"); public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout"); public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period"); public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name"); - public static final String HBASE_BASELINE_FAMLIY_NAME = CommonConfigurations.getStringProperty("hbase.baseline.famliy.name"); public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index d677dfe..fe30c70 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -27,43 +27,47 @@ public class DosDetection extends BroadcastProcessFunction out) throws Exception { - Map>> broadcast = ctx.getBroadcastState(OutputStreamSink.descriptor).get("broadcast-state"); - String destinationIp = value.getDestination_ip(); - String attackType = value.getAttack_type(); - logger.info("当前判断数据:{}",value.toString()); - if (broadcast.containsKey(destinationIp)){ - List baseline = broadcast.get(destinationIp).get(attackType); - if (baseline != null && baseline.size() == BASELINESIZE){ - int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); - Integer base = baseline.get(timeIndex); - long sketchSessions = value.getSketch_sessions(); - long diff = sketchSessions - base; - if (diff > 0){ - String percent = getDiffPercent(diff, sketchSessions); - double diffPercentDouble = getDiffPercentDouble(percent); - Severity severity = judgeSeverity(diffPercentDouble); - if (severity != Severity.NORMAL){ - DosEventLog result = getResult(value, severity, percent); - logger.info("检测到当前server IP {} 存在异常,日志详情 {}",destinationIp,result.toString()); - out.collect(result); - }else { - logger.info("当前server IP:{} 未出现异常,日志详情 {}",destinationIp,value.toString()); + try { + Map>> broadcast = ctx.getBroadcastState(OutputStreamSink.descriptor).get("broadcast-state"); + String destinationIp = value.getDestination_ip(); + String attackType = value.getAttack_type(); + logger.info("当前判断IP:{}, 类型: {}",destinationIp,attackType); + if (broadcast.containsKey(destinationIp)){ + List baseline = broadcast.get(destinationIp).get(attackType); + if (baseline != null && baseline.size() == BASELINE_SIZE){ + int timeIndex = getCurrentTimeIndex(value.getSketch_start_time()); + Integer base = baseline.get(timeIndex); + long sketchSessions = value.getSketch_sessions(); + long diff = sketchSessions - base; + if (diff > 0){ + String percent = getDiffPercent(diff, sketchSessions); + double diffPercentDouble = getDiffPercentDouble(percent); + Severity severity = judgeSeverity(diffPercentDouble); + if (severity != Severity.NORMAL){ + DosEventLog result = getResult(value, severity, percent); + logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}",destinationIp,attackType,result.toString()); + out.collect(result); + }else { + logger.info("当前server IP:{} 未出现 {} 异常,日志详情 {}",destinationIp,attackType,value.toString()); + } } } + }else { + logger.info("未获取到当前server IP:{} 类型 {} baseline数据",destinationIp,attackType); } - }else { - logger.info("未获取到当前server IP:{} baseline数据",destinationIp); + }catch (Exception e){ + logger.error("判定失败\n {} \n{}",value,e); } } @@ -82,9 +86,8 @@ public class DosDetection extends BroadcastProcessFunction keys,Iterable elements){ DosSketchLog midResuleLog = new DosSketchLog(); - Tuple4 values = sketchAggregate(elements); + Tuple6 values = sketchAggregate(elements); try { if (values != null){ midResuleLog.setCommon_sled_ip(keys.f0); midResuleLog.setCommon_data_center(keys.f1); midResuleLog.setDestination_ip(keys.f3); midResuleLog.setAttack_type(keys.f2); + midResuleLog.setSketch_start_time(values.f4); + midResuleLog.setSketch_duration(values.f5); midResuleLog.setSource_ip(values.f3); midResuleLog.setSketch_sessions(values.f0); midResuleLog.setSketch_packets(values.f1); @@ -58,24 +61,28 @@ public class EtlProcessFunction extends ProcessWindowFunction sketchAggregate(Iterable elements){ + private Tuple6 sketchAggregate(Iterable elements){ int cnt = 1; long sessions = 0; long packets = 0 ; long bytes = 0; + long startTime = 0; + long duration = 0; HashSet sourceIpSet = new HashSet<>(); try { for (DosSketchLog newSketchLog : elements){ sessions += newSketchLog.getSketch_sessions(); packets += newSketchLog.getSketch_packets(); bytes += newSketchLog.getSketch_bytes(); + startTime = newSketchLog.getSketch_start_time(); + duration = newSketchLog.getSketch_duration(); cnt += 1; if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){ sourceIpSet.add(newSketchLog.getSource_ip()); } } String sourceIpList = StringUtils.join(sourceIpSet, ","); - return Tuple4.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList); + return Tuple6.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList,startTime,duration); }catch (Exception e){ logger.error("聚合中间结果集失败 {}",e); } diff --git a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java index 1d7a499..a43314b 100644 --- a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java +++ b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java @@ -22,7 +22,7 @@ class TrafficServerIpMetrics { dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets()); dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes()); - logger.debug("metric 结果已加载:{}",dosMetricsLog.toString()); + logger.info("metric 结果已加载:{}",dosMetricsLog.toString()); return dosMetricsLog; } diff --git a/src/main/java/com/zdjizhi/sink/DosEventSink.java b/src/main/java/com/zdjizhi/sink/DosEventSink.java index 6ca0662..ad81931 100644 --- a/src/main/java/com/zdjizhi/sink/DosEventSink.java +++ b/src/main/java/com/zdjizhi/sink/DosEventSink.java @@ -6,7 +6,7 @@ import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -public class DosEventSink { +class DosEventSink { static void dosEventOutputSink(SingleOutputStreamOperator dosEventLogOutputStream){ dosEventLogOutputStream.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME)) diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index caddf67..953dd56 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -39,26 +39,35 @@ public class OutputStreamSink { new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class>) (Class) List.class).getTypeClass())); public static void finalOutputSink(){ - DosEventSink.dosEventOutputSink(getOutputSinkStream()); - TrafficServerIpMetricsSink.sideOutputMetricsSink(getMiddleStream()); + try { + SingleOutputStreamOperator middleStream = getMiddleStream(); + SingleOutputStreamOperator dosEventLogOutputStream = getOutputSinkStream(middleStream); + DosEventSink.dosEventOutputSink(dosEventLogOutputStream); + TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); + FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); + } catch (Exception e) { + logger.error(""); + } } public static void main(String[] args) throws Exception { - SingleOutputStreamOperator dosEventLogOutputStream = getOutputSinkStream(); + SingleOutputStreamOperator middleStream = getMiddleStream(); + SingleOutputStreamOperator dosEventLogOutputStream = getOutputSinkStream(middleStream); DosEventSink.dosEventOutputSink(dosEventLogOutputStream); - TrafficServerIpMetricsSink.sideOutputMetricsSink(getMiddleStream()); + TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); dosEventLogOutputStream.print(); FlinkEnvironmentUtils.streamExeEnv.execute(); } - private static SingleOutputStreamOperator getOutputSinkStream(){ + private static SingleOutputStreamOperator getOutputSinkStream(SingleOutputStreamOperator middleStream){ BroadcastStream>>> broadcast = FlinkEnvironmentUtils.streamExeEnv .addSource(new BaselineSource()) .broadcast(descriptor); logger.info("广播变量加载成功!!"); - return getMiddleStream().keyBy(new SecondKeySelector()) + return middleStream.keyBy(new SecondKeySelector()) +// .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) .reduce(new SecondReduceFunc()) .connect(broadcast) .process(new DosDetection()); diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 804f515..76a18ab 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -1,9 +1,11 @@ stream.execution.environment.parallelism=1 +stream.execution.job.name=dos-detection-job kafka.input.parallelism=1 kafka.input.topic.name=DOS-SKETCH-LOG kafka.input.bootstrap.servers=192.168.44.12:9092 kafka.input.scan.startup.mode=latest-offset +#kafka.input.group.id=2107291738 kafka.input.group.id=test kafka.output.metric.parallelism=1 @@ -13,22 +15,21 @@ kafka.output.event.topic.name=DOS-EVENT-LOG kafka.output.bootstrap.servers=192.168.44.12:9092 hbase.zookeeper.quorum=192.168.44.12:2181 -hbase.zookeeper.client.port=2181 hbase.client.operation.timeout=30000 hbase.client.scanner.timeout.period=30000 hbase.baseline.table.name=ddos_traffic_baselines -hbase.baseline.famliy.name=TCP SYN Flood hbase.baseline.total.num=1000000 flink.watermark.max.orderness=1 -flink.window.max.time=600 +flink.window.max.time=10 source.ip.list.limit=10000 data.center.id.num=15 ip.mmdb.path=D:\\data\\dat\\ +#ip.mmdb.path=/home/bigdata/topology/dat/ baseline.sessions.minor.threshold=0.1 baseline.sessions.warning.threshold=0.5