package com.zdjizhi.function; import com.zdjizhi.common.DosSketchLog; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; public class MetricsCalculate extends ProcessWindowFunction< DosSketchLog, // 输入类型 DosSketchLog, // 输出类型 Tuple4, // 键类型 TimeWindow> { // 窗口类型 private final Map attackTypeMapping = new HashMap<>(); private static Logger logger = LoggerFactory.getLogger(MetricsCalculate.class); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); attackTypeMapping.put("TCP SYN","TCP SYN Flood"); attackTypeMapping.put("DNS","UDP Flood"); attackTypeMapping.put("ICMP","ICMP Flood"); attackTypeMapping.put("UDP","DNS Flood"); attackTypeMapping.put("NTP","NTP Flood"); attackTypeMapping.put("","Custom Network Attack"); } @Override public void process(Tuple4 key, ProcessWindowFunction, TimeWindow>.Context context, Iterable elements, Collector out) throws Exception { for (DosSketchLog dosSketchLog: elements){ try { long duration = dosSketchLog.getEnd_timestamp_ms()-dosSketchLog.getStart_timestamp_ms(); if(duration<=0){ duration = dosSketchLog.getDuration(); dosSketchLog.setEnd_timestamp_ms(dosSketchLog.getStart_timestamp_ms()+duration); } dosSketchLog.setSession_rate(dosSketchLog.getSessions()/ (duration/1000) ); dosSketchLog.setPacket_rate(dosSketchLog.getPkts()/(duration/1000)); dosSketchLog.setBit_rate(dosSketchLog.getBytes()*8/(duration/1000)); dosSketchLog.setAttack_type(attackTypeMapping.get(dosSketchLog.getDecoded_as())); }catch (RuntimeException e){ logger.error(e.toString()); } out.collect(dosSketchLog); } } }