DoS 检测支持vsys id
This commit is contained in:
@@ -9,7 +9,7 @@ import com.zdjizhi.etl.EtlProcessFunction;
|
||||
import com.zdjizhi.etl.ParseSketchLog;
|
||||
import com.zdjizhi.utils.FlinkEnvironmentUtils;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.streaming.api.datastream.*;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
@@ -37,7 +37,7 @@ public class OutputStreamSink {
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosEventLog> getEventSinkStream(SingleOutputStreamOperator<DosSketchLog> middleStream){
|
||||
return middleStream.map(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
||||
return middleStream.process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM);
|
||||
}
|
||||
|
||||
private static SingleOutputStreamOperator<DosSketchLog> getMiddleStream(){
|
||||
@@ -48,12 +48,13 @@ public class OutputStreamSink {
|
||||
.setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM);
|
||||
}
|
||||
|
||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple2<String, String>>{
|
||||
private static class KeysSelector implements KeySelector<DosSketchLog, Tuple3<String, String, Integer>>{
|
||||
@Override
|
||||
public Tuple2<String, String> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple2.of(
|
||||
public Tuple3<String, String, Integer> getKey(DosSketchLog dosSketchLog){
|
||||
return Tuple3.of(
|
||||
dosSketchLog.getAttack_type(),
|
||||
dosSketchLog.getDestination_ip());
|
||||
dosSketchLog.getDestination_ip(),
|
||||
dosSketchLog.getVsys_id());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user