package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import com.zdjizhi.etl.LogService; import com.zdjizhi.etl.dns.SketchTimeMapFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Map; import java.util.Objects; import static com.zdjizhi.common.FlowWriteConfig.*; public class ConnLogService { public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ //connection DataStream> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); //写入CKsink,批量处理 LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //transform DataStream> connTransformStream = getConnTransformStream(connSource); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch DataStream> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); //写入arangodb LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); } /** * 通联原始日志数据源消费kafka * * @param source * @return */ private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; DataStream> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) .setParallelism(SOURCE_PARALLELISM) .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(source) .setParallelism(SOURCE_PARALLELISM); return sourceStream; } private static DataStream> getConnTransformStream(DataStream> connSource) throws Exception { DataStream> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> { return Convert.toLong(event.get("conn_start_time")) * 1000; })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return connTransformStream; } private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()); return sketchTransformStream; } private static DataStream> getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; } }