package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author qidaijie * @Package com.zdjizhi.topology * @Description: * @date 2021/5/2016:42 */ public class StreamAggregateTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC); SingleOutputStreamOperator> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); WindowedStream, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME))); SingleOutputStreamOperator> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) .name("FirstCountWindow") .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); WindowedStream, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME))); SingleOutputStreamOperator secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); SingleOutputStreamOperator resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction()) .name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC); environment.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } } }