package com.zdjizhi.etl; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashSet; import static com.zdjizhi.sink.OutputStreamSink.outputTag; /** * @author 94976 */ public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; private static final String EMPTY_SOURCE_IP_IPV6 = "::"; @Override public void process(Tuple2 keys, Context context, Iterable elements, Collector out) { DosSketchLog middleResult = getMiddleResult(keys, elements); try { if (middleResult != null){ out.collect(middleResult); logger.debug("获取中间聚合结果:{}",middleResult.toString()); context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult)); } }catch (Exception e){ logger.error("获取中间聚合结果失败,middleResult: {}\n{}",middleResult.toString(),e); } } private DosSketchLog getMiddleResult(Tuple2 keys,Iterable elements){ DosSketchLog midResuleLog = new DosSketchLog(); Tuple6 values = sketchAggregate(elements); try { if (values != null){ midResuleLog.setAttack_type(keys.f0); midResuleLog.setDestination_ip(keys.f1); midResuleLog.setSketch_start_time(values.f4); midResuleLog.setSketch_duration(values.f5); midResuleLog.setSource_ip(values.f3); midResuleLog.setSketch_sessions(values.f0); midResuleLog.setSketch_packets(values.f1); midResuleLog.setSketch_bytes(values.f2); return midResuleLog; } } catch (Exception e){ logger.error("加载中间结果集失败,keys: {} values: {}\n{}",keys,values,e); } return null; } private Tuple6 sketchAggregate(Iterable elements){ long sessions = 0; long packets = 0 ; long bytes = 0; long startTime = System.currentTimeMillis()/1000; long endTime = System.currentTimeMillis()/1000; long duration = 0; HashSet sourceIpSet = new HashSet<>(); try { for (DosSketchLog newSketchLog : elements){ String sourceIp = newSketchLog.getSource_ip(); if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){ sessions += newSketchLog.getSketch_sessions(); packets += newSketchLog.getSketch_packets(); bytes += newSketchLog.getSketch_bytes(); startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time(); endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime; duration = endTime - startTime == 0 ? 5 : endTime - startTime; }else { if (sourceIpSet.size() < CommonConfig.SOURCE_IP_LIST_LIMIT){ sourceIpSet.add(sourceIp); } } } String sourceIpList = StringUtils.join(sourceIpSet, ","); return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME, bytes*8/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration); }catch (Exception e){ logger.error("聚合中间结果集失败 {}",e); } return null; } }