package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction; import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction; import com.zdjizhi.utils.functions.parse.ParseMapFunction; import com.zdjizhi.utils.functions.result.ResultFlatMapFunction; import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction; import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.Map; /** * @author qidaijie * @Package com.zdjizhi.topology * @Description: * @date 2021/5/2016:42 */ public class StreamAggregateTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); //解析原始日志 DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC); //解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜 SingleOutputStreamOperator>> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); //初步聚合计算,增加自定义key 缓解数据倾斜 WindowedStream>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME))); //初次聚合计算窗口 SingleOutputStreamOperator>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) .name("FirstCountWindow") .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); //二次聚合计算,使用业务的key 进行数据汇总 WindowedStream>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME))); //二次聚合计算窗口 SingleOutputStreamOperator> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); //拆解结果数据按protocol id循环输出 SingleOutputStreamOperator resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction()) .name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); //输出到kafka resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC); environment.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } } }