diff --git a/pom.xml b/pom.xml index 406346f..17e482f 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.13.1 2.1.1 2.7.1 - 2.11 + 2.12 2.4.0 @@ -136,15 +136,15 @@ - - - + + + - - - + + + @@ -196,11 +196,11 @@ - - - - - + + + + + @@ -337,8 +337,6 @@ ${flink.version} - - com.geedgenetworks galaxy @@ -347,4 +345,4 @@ - \ No newline at end of file + diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java index 7ffd1d8..c3ebbcc 100644 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java @@ -6,7 +6,7 @@ import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosSketchLog; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -44,7 +44,7 @@ public class EtlProcessFunction extends ProcessWindowFunction keys,Iterable elements){ DosSketchLog midResuleLog = new DosSketchLog(); - Tuple6 values = sketchAggregate(elements); + Tuple7 values = sketchAggregate(elements); try { if (values != null){ midResuleLog.setAttack_type(keys.f0); @@ -56,6 +56,7 @@ public class EtlProcessFunction extends ProcessWindowFunction sketchAggregate(Iterable elements){ + private Tuple7 sketchAggregate(Iterable elements){ long sessions = 0; long packets = 0 ; long bytes = 0; long startTime = System.currentTimeMillis()/1000; long endTime = System.currentTimeMillis()/1000; long duration = 0; + long recvtime = 0; HashSet sourceIpSet = new HashSet<>(); try { for (DosSketchLog newSketchLog : elements){ + if (recvtime == 0){ + recvtime = newSketchLog.getCommon_recv_time(); + }else if (recvtime > newSketchLog.getCommon_recv_time()){ + recvtime = newSketchLog.getCommon_recv_time(); + } + System.out.println(newSketchLog.getCommon_recv_time()); String sourceIp = newSketchLog.getSource_ip(); if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){ sessions += newSketchLog.getSketch_sessions(); @@ -88,9 +96,10 @@ public class EtlProcessFunction extends ProcessWindowFunction sketchSource = JSONObject.parseObject(s, HashMap.class); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 5ba8c9f..23e3551 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -8,14 +8,14 @@ stream.execution.job.name=DOS-DETECTION-APPLICATION kafka.input.parallelism=3 #输入kafka topic名 -kafka.input.topic.name=DOS-SKETCH-RECORD +kafka.input.topic.name=test #输入kafka地址 kafka.input.bootstrap.servers=192.168.44.12:9094 #kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id -kafka.input.group.id=dos-detection-job-221125-1 +kafka.input.group.id=dos-detection-job-221125-23132 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 @@ -30,7 +30,7 @@ kafka.output.event.parallelism=3 #发送kafka event topic名 #kafka.output.event.topic.name=DOS-EVENT -kafka.output.event.topic.name=abcd +kafka.output.event.topic.name=dos-test #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9094