diff --git a/pom.xml b/pom.xml index 2b6f210..1e81091 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ org.apache.hadoop hadoop-common 2.7.1 - provided + zookeeper @@ -213,6 +213,10 @@ slf4j-api org.slf4j + + hadoop-common + org.apache.hadoop + diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 8190055..5a138a4 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -10,7 +10,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; -import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import java.math.BigDecimal; @@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit; /** * @author wlh */ -public class DosDetection extends ProcessFunction { +public class DosDetection extends BroadcastProcessFunction, DosEventLog> { -// private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); + // private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); private static final Log logger = LogFactory.get(); private static Map> baselineMap = new HashMap<>(); private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); @@ -47,6 +47,12 @@ public class DosDetection extends ProcessFunction { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); try { + + super.open(parameters); + logger.info("begin init"); + IpUtils.loadIpLook(); + logger.info("init over"); + executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); @@ -59,7 +65,7 @@ public class DosDetection extends ProcessFunction { } @Override - public void processElement(DosSketchLog value, Context ctx, Collector out) { + public void processElement(DosSketchLog value, ReadOnlyContext ctx, Collector out) { ArrayList finalResults = new ArrayList<>(); try { String destinationIp = value.getDestination_ip(); @@ -98,6 +104,11 @@ public class DosDetection extends ProcessFunction { } } + @Override + public void processBroadcastElement(Map value, Context ctx, Collector out) throws Exception { + IpUtils.updateIpLook(value); + } + private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { long sketchSessions = value.getSketch_sessions(); Integer staticSensitivityThreshold = NacosUtils.getIntProperty("static.sensitivity.threshold"); diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index 2ef2b1b..65d16e5 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -1,25 +1,15 @@ package com.zdjizhi.etl; -import com.alibaba.nacos.api.PropertyKeyConst; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; -import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.function.BroadcastProcessFunc; import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; -import org.apache.flink.streaming.api.datastream.BroadcastStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +33,7 @@ public class ParseSketchLog { } private static SingleOutputStreamOperator flatSketchSource(){ - - DataStreamSource> broadcastSource=null; - Properties nacosProperties = new Properties(); - - nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); - nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); - nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); - - if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ - broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH); - }else { - broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties); - } - - MapStateDescriptor descriptor = - new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class)); - - BroadcastStream> broadcast = broadcastSource.broadcast(descriptor); -// BroadcastConnectedStream> connect = DosSketchSource.createDosSketchSource().connect(broadcast); - return DosSketchSource.createDosSketchSource() - .connect(broadcast).process(new BroadcastProcessFunc()); -// .flatMap(new FlatSketchLog()); + return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); } private static WatermarkStrategy createWatermarkStrategy(){ @@ -82,12 +51,14 @@ public class ParseSketchLog { long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); + int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); dosSketchLog.setSketch_duration(sketchDuration); dosSketchLog.setAttack_type(attackType); + dosSketchLog.setVsys_id(vsysId); String sourceIp = obj.get("source_ip").toString(); String destinationIp = obj.get("destination_ip").toString(); long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index de0e0ac..c187415 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -2,6 +2,7 @@ package com.zdjizhi.sink; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.PropertyKeyConst; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosMetricsLog; @@ -9,7 +10,11 @@ import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.etl.DosDetection; import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.ParseSketchLog; +import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; @@ -17,6 +22,9 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; +import java.util.Map; +import java.util.Properties; + /** * @author 94976 */ @@ -38,7 +46,28 @@ public class OutputStreamSink { } private static SingleOutputStreamOperator getEventSinkStream(SingleOutputStreamOperator middleStream){ - return middleStream.process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); + DataStreamSource> broadcastSource=null; + Properties nacosProperties = new Properties(); + + nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); + nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); + nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); + + if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ + broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH); + }else { + broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties); + } + + MapStateDescriptor descriptor = + new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class)); + + BroadcastStream> broadcast = broadcastSource.broadcast(descriptor); + + return middleStream + .connect(broadcast) + .process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); + } private static SingleOutputStreamOperator getMiddleStream(){ diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 4e9dfd0..6d49ff8 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -125,17 +125,6 @@ sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ #是否开启kafka用户认证配置,1:是;0:否 sasl.jaas.config.flag=1 -#nacos配置 -#nacos.server.addr=192.168.44.12:8848 -#nacos.namespace=public -#nacos.username=nacos -#nacos.password=nacos -#nacos.data.id=knowledge_base.json -#nacos.group=DEFAULT_GROUP -#nacos.read.timeout=5000 - - - ############################## Nacos 配置 ###################################### nacos.server.addr=192.168.44.12:8848 nacos.username=nacos @@ -152,30 +141,6 @@ nacos.static.namespace=test nacos.static.data.id=dos_detection.properties nacos.static.group=Galaxy -############################## HTTP 配置 ###################################### -#http请求相关参数 -#最大连接数 -#http.pool.max.connection=400 -# -##单路由最大连接数 -#http.pool.max.per.route=80 -# -##向服务端请求超时时间设置(单位:毫秒) -#http.pool.request.timeout=60000 -# -##向服务端连接超时时间设置(单位:毫秒) -#http.pool.connect.timeout=60000 -# -##服务端响应超时时间设置(单位:毫秒) -#http.pool.response.timeout=60000 - - -#server.uri=http://192.168.44.12:9098 -#server.path=/hos/knowledge_base_hos_bucket - - - - ############################## hos Token 配置 ###################################### hos.token=c21f969b5f03d33d43e04f8f136e7682