package com.zdjizhi.sink; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.etl.DosDetection; import com.zdjizhi.etl.EtlProcessFunction; import com.zdjizhi.etl.ParseSketchLog; import com.zdjizhi.source.DosSketchSource; import com.zdjizhi.utils.FlinkEnvironmentUtils; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.util.Map; import java.util.Properties; /** * @author 94976 */ public class OutputStreamSink { // private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class); private static final Log logger = LogFactory.get(); public static OutputTag outputTag = new OutputTag("traffic server ip metrics"){}; public static void finalOutputSink(){ try { SingleOutputStreamOperator middleStream = getMiddleStream(); DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream)); TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME); } catch (Exception e) { logger.error("任务启动失败 {}",e); } } private static SingleOutputStreamOperator getEventSinkStream(SingleOutputStreamOperator middleStream){ DataStreamSource> broadcastSource=null; Properties nacosProperties = new Properties(); nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH); }else { broadcastSource= DosSketchSource.singleBroadcastSource(nacosProperties); } MapStateDescriptor descriptor = new MapStateDescriptor<>("descriptorTest", Types.STRING, TypeInformation.of(Map.class)); BroadcastStream> broadcast = broadcastSource.broadcast(descriptor); return middleStream .connect(broadcast) .process(new DosDetection()).setParallelism(CommonConfig.FLINK_DETECTION_MAP_PARALLELISM); } private static SingleOutputStreamOperator getMiddleStream(){ return ParseSketchLog.getSketchSource() .keyBy(new KeysSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(CommonConfig.FLINK_WINDOW_MAX_TIME))) .process(new EtlProcessFunction()) .setParallelism(CommonConfig.FLINK_FIRST_AGG_PARALLELISM); } private static class KeysSelector implements KeySelector>{ @Override public Tuple3 getKey(DosSketchLog dosSketchLog){ return Tuple3.of( dosSketchLog.getAttack_type(), dosSketchLog.getDestination_ip(), dosSketchLog.getVsys_id()); } } }