package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.common.config.GlobalConfig; import com.zdjizhi.common.pojo.AppProtocol; import com.zdjizhi.utils.functions.filter.DataTypeFilter; import com.zdjizhi.utils.functions.keyby.DimensionKeyBy; import com.zdjizhi.utils.functions.map.MetricsParseMap; import com.zdjizhi.utils.functions.map.ResultFlatMap; import com.zdjizhi.utils.functions.statistics.DispersionCountWindow; import com.zdjizhi.utils.functions.statistics.MergeCountWindow; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * @author qidaijie * @Package com.zdjizhi.topology * @Description: * @date 2021/5/2016:42 */ public class ApplicationProtocolTopology { private static final Log logger = LogFactory.get(); public static void main(String[] args) { try { final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //解析原始日志 DataStream streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(GlobalConfig.SOURCE_PARALLELISM).name(GlobalConfig.SOURCE_KAFKA_TOPIC); SingleOutputStreamOperator appProtocolFilter = streamSource.filter(new DataTypeFilter()) .name("appProtocolFilter").setParallelism(GlobalConfig.SOURCE_PARALLELISM); SingleOutputStreamOperator> parseDataMap = appProtocolFilter.map(new MetricsParseMap()) .name("ParseDataMap").setParallelism(GlobalConfig.PARSE_PARALLELISM); SingleOutputStreamOperator dispersionCountWindow = parseDataMap.keyBy(new DimensionKeyBy()) .window(TumblingProcessingTimeWindows.of(Time.seconds(GlobalConfig.COUNT_WINDOW_TIME))) .reduce(new DispersionCountWindow(), new MergeCountWindow()) .name("DispersionCountWindow") .setParallelism(GlobalConfig.WINDOW_PARALLELISM); SingleOutputStreamOperator resultFlatMap = dispersionCountWindow.flatMap(new ResultFlatMap()) .name("ResultFlatMap").setParallelism(GlobalConfig.SINK_PARALLELISM); resultFlatMap.addSink(KafkaProducer.getKafkaProducer()) .setParallelism(GlobalConfig.SINK_PARALLELISM).name(GlobalConfig.SINK_KAFKA_TOPIC); environment.execute(args[0]); } catch (Exception e) { logger.error("This Flink task start ERROR! Exception information is :" + e); } } }