package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.util.TypeUtils; import com.arangodb.entity.BaseEdgeDocument; import com.zdjizhi.etl.LogService; import com.zdjizhi.utils.arangodb.AGSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Map; import java.util.Objects; import static com.zdjizhi.common.FlowWriteConfig.*; public class ConnLogService { public static void connLogStream(StreamExecutionEnvironment env) throws Exception { //connection DataStream> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); //transform DataStream> connTransformStream = getConnTransformStream(connSource); if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { //写入CKsink,批量处理 LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch DataStream ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); //写入arangodb ConnLogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); } } /** * 通联原始日志数据源消费kafka * * @param source * @return */ private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; DataStream> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> { if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) { return false; } if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) { if (TypeUtils.castToLong(x.get("total_cs_pkts")) < 0 || TypeUtils.castToLong(x.get("total_cs_pkts")) == Long.MAX_VALUE || TypeUtils.castToLong(x.get("total_sc_pkts")) < 0 || TypeUtils.castToLong(x.get("total_sc_pkts")) == Long.MAX_VALUE || TypeUtils.castToLong(x.get("total_cs_bytes")) < 0 || TypeUtils.castToLong(x.get("total_cs_bytes")) == Long.MAX_VALUE || TypeUtils.castToLong(x.get("total_sc_bytes")) < 0 || TypeUtils.castToLong(x.get("total_sc_bytes")) == Long.MAX_VALUE) { return false; } return true; } else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) { if (TypeUtils.castToLong(x.get("sketch_sessions")) < 0 || TypeUtils.castToLong(x.get("sketch_sessions")) == Long.MAX_VALUE || TypeUtils.castToLong(x.get("sketch_packets")) < 0 || TypeUtils.castToLong(x.get("sketch_packets")) == Long.MAX_VALUE || TypeUtils.castToLong(x.get("sketch_bytes")) < 0 || TypeUtils.castToLong(x.get("sketch_bytes")) == Long.MAX_VALUE) { return false; } return true; } else { return false; } }).setParallelism(SOURCE_PARALLELISM); DataStream> sourceStream = filterStream.map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(source); return sourceStream; } private static DataStream> getConnTransformStream(DataStream> connSource) throws Exception { return connSource .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> { return Convert.toLong(event.get("conn_start_time")) * 1000; })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); } private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); return sketchTransformStream; } private static DataStream getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { DataStream ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; } public static void getLogArangoSink(DataStream sourceStream, String sink) throws Exception { sourceStream.addSink(new AGSink(sink)) .setParallelism(SINK_PARALLELISM) .name(sink) .setParallelism(SINK_PARALLELISM); } }