diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java index 7c501b5..576b846 100644 --- a/src/main/java/com/zdjizhi/enums/LogMetadata.java +++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java @@ -18,7 +18,12 @@ public enum LogMetadata { * 日志名称,表名,字段 * */ - CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}), + CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{ + "cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", + "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", + "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", + "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", + "imei", "imsi"}), CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}), CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}), DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}), diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index 3038914..c61fdaa 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -1,6 +1,7 @@ package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; +import com.alibaba.fastjson.util.TypeUtils; import com.zdjizhi.etl.LogService; import com.zdjizhi.etl.dns.SketchTimeMapFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; @@ -85,7 +86,7 @@ public class ConnLogService { private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) - .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) + .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()); diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java new file mode 100644 index 0000000..e469ac1 --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java @@ -0,0 +1,162 @@ +//package com.zdjizhi.etl.connection; +// +//import cn.hutool.core.convert.Convert; +//import cn.hutool.core.util.StrUtil; +//import com.zdjizhi.etl.LogService; +//import com.zdjizhi.etl.dns.SketchTimeMapFunction; +//import com.zdjizhi.utils.kafka.KafkaConsumer; +//import org.apache.flink.api.common.eventtime.WatermarkStrategy; +//import org.apache.flink.api.common.functions.MapFunction; +//import org.apache.flink.api.java.utils.ParameterTool; +//import org.apache.flink.streaming.api.datastream.DataStream; +//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +//import org.apache.flink.streaming.api.windowing.time.Time; +//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; +//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; +//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; +// +//import java.time.Duration; +//import java.util.*; +// +//import static com.zdjizhi.common.FlowWriteConfig.*; +// +// +//public class ConnLogService2 { +// +// public static String convertToCsv(Map map) { +// List list = new ArrayList<>(); +// list.add("("); +// for (Map.Entry m : map.entrySet()) { +// if (m.getValue() instanceof String) { +// list.add("'" + m.getValue() + "'"); +// } else { +// list.add(m.getValue()); +// } +// } +// list.add(")"); +// String join = StrUtil.join(",", list); +// return join; +// +// } +// +// public static void connLogStream(StreamExecutionEnvironment env) throws Exception { +// //connection +// DataStream> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); +// //sketch +// DataStream> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); +// +// //写入CKsink,批量处理 +// LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); +// +// Map globalParameters = new HashMap<>(); +// // ClickHouse cluster properties +// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://bigdata-85:8123/"); +// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, CK_USERNAME); +// globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, CK_PIN); +// +// // sink common +// globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1"); +// globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/"); +// globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2"); +// globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2"); +// globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2"); +// globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); +// +// // set global paramaters +// ParameterTool parameters = ParameterTool.fromMap(globalParameters); +// env.getConfig().setGlobalJobParameters(parameters); +// +// // Transform 操作 +// DataStream dataStream = sketchSource.map(new MapFunction, String>() { +// @Override +// public String map(Map data) throws Exception { +// String s = convertToCsv(data); +// System.err.println(s); +// return convertToCsv(data); +// } +// }); +// +// +// // create props for sink +// Properties props = new Properties(); +// props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, CK_DATABASE + "." + SINK_CK_TABLE_SKETCH); +// props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, CK_BATCH); +// ClickHouseSink sink = new ClickHouseSink(props); +// dataStream.addSink(sink); +// dataStream.print(); +// +//// LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); +// +// //transform +// DataStream> connTransformStream = getConnTransformStream(connSource); +// +// //写入ck通联relation表 +// LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); +// +// DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); +// +// //合并通联和通联sketch +// DataStream> ip2ipGraph = getConnUnion(connTransformStream, sketchTransformStream); +// +// //写入arangodb +// LogService.getLogArangoSink(ip2ipGraph, R_VISIT_IP2IP); +// +// } +// +// /** +// * 通联原始日志数据源消费kafka +// * +// * @param source +// * @return +// */ +// private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { +// +// String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; +// +// DataStream> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) +// .setParallelism(SOURCE_PARALLELISM) +// .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) +// .setParallelism(SOURCE_PARALLELISM) +// .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) +// .setParallelism(SOURCE_PARALLELISM) +// .name(source) +// .setParallelism(SOURCE_PARALLELISM); +// return sourceStream; +// } +// +// private static DataStream> getConnTransformStream(DataStream> connSource) throws Exception { +// DataStream> connTransformStream = connSource +// .assignTimestampsAndWatermarks(WatermarkStrategy +// .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) +// .withTimestampAssigner((event, timestamp) -> { +// return Convert.toLong(event.get("conn_start_time")) * 1000; +// })) +// .setParallelism(TRANSFORM_PARALLELISM) +// .keyBy(new IpKeysSelector()) +// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) +// .process(new ConnProcessFunction()) +// .setParallelism(TRANSFORM_PARALLELISM); +// return connTransformStream; +// } +// +// private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { +// DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy +// .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) +// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) +// .keyBy(new IpKeysSelector()) +// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) +// .process(new SketchProcessFunction()); +// return sketchTransformStream; +// } +// +// private static DataStream> getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { +// DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) +// .keyBy(new IpKeysSelector()) +// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) +// .process(new Ip2IpGraphProcessFunction()) +// .setParallelism(TRANSFORM_PARALLELISM); +// return ip2ipGraph; +// } +// +//} diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 40afc63..bdad72a 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.util.TypeUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -28,7 +29,7 @@ public class ConnProcessFunction extends ProcessWindowFunction values = connAggregate(elements); if (values != null) { - Map result = new LinkedHashMap<>(); + Map result = new HashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -45,18 +46,24 @@ public class ConnProcessFunction extends ProcessWindowFunction connAggregate(Iterable> elements) { - long sessions = 0; - long packets = 0; - long bytes = 0; + long sessions = 0L; + long packets = 0L; + long bytes = 0L; long startTime = DateUtil.currentSeconds(); long endTime = DateUtil.currentSeconds(); try { for (Map newSketchLog : elements) { long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time")); - if(connStartTimetime>0){ + if (connStartTimetime > 0) { sessions++; - packets = packets + Convert.toLong(newSketchLog.get("total_cs_pkts")) + Convert.toLong(newSketchLog.get("total_sc_pkts")); - bytes = bytes + Convert.toLong(newSketchLog.get("total_cs_bytes")) + Convert.toLong(newSketchLog.get("total_sc_bytes")); + Long totalCsPkts = TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")); + Long totalScPkts = TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); + packets = packets + totalCsPkts < Long.MAX_VALUE ? totalCsPkts : 0 + totalScPkts < Long.MAX_VALUE ? totalScPkts : 0; + + Long totalCsBytes = TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")); + Long totalScBytes = TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); + bytes = bytes + totalCsBytes< Long.MAX_VALUE ? totalCsBytes : 0 + totalScBytes< Long.MAX_VALUE ? totalScBytes : 0; + startTime = connStartTimetime < startTime ? connStartTimetime : startTime; endTime = connStartTimetime > endTime ? connStartTimetime : endTime; } diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index 8862da6..31957dc 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -4,13 +4,14 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.util.TypeUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -41,7 +42,7 @@ public class SketchProcessFunction extends ProcessWindowFunction values = connAggregate(elements); try { if (values != null) { - Map result = new LinkedHashMap<>(); + Map result = new HashMap<>(); result.put("start_time", values.f0); result.put("end_time", values.f1); result.put("src_ip", keys.f0); @@ -59,20 +60,20 @@ public class SketchProcessFunction extends ProcessWindowFunction connAggregate(Iterable> elements) { - long sessions = 0; - long packets = 0; - long bytes = 0; + long sessions = 0L; + long packets = 0L; + long bytes = 0L; long startTime = DateUtil.currentSeconds(); long endTime = DateUtil.currentSeconds(); try { for (Map newSketchLog : elements) { - long connStartTimetime = Convert.toLong(newSketchLog.get("sketch_start_time")); - if(connStartTimetime>0){ - sessions += Convert.toLong(newSketchLog.get("sketch_sessions")); - packets += Convert.toLong(newSketchLog.get("sketch_packets")); - bytes += Convert.toLong(newSketchLog.get("sketch_bytes")); - startTime = connStartTimetime < startTime ? connStartTimetime : startTime; - endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time")); + if (connStartTime > 0) { + sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) : 0; + packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_packets")) : 0; + bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) : 0; + startTime = connStartTime < startTime ? connStartTime : startTime; + endTime = connStartTime > endTime ? connStartTime : endTime; } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java index 894867f..8744060 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java @@ -9,7 +9,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.LOG_AGGREGATE_DURATION; @@ -45,7 +45,7 @@ public class DnsRelationProcessFunction extends ProcessWindowFunction endTime ? logStartTime : endTime; } } - Map newDns = new LinkedHashMap<>(); + Map newDns = new HashMap<>(); newDns.put("start_time", startTime); newDns.put("end_time", endTime + LOG_AGGREGATE_DURATION); newDns.put("record_type", keys.f0); diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 1ab45f8..0558dae 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -65,7 +65,7 @@ public class ClickhouseSink extends RichSinkFunction>> try { StopWatch stopWatch = new StopWatch(); stopWatch.start(); - log.info("开始写入ck数据 :{}", data.size()); + log.debug("开始写入ck数据 :{}", data.size()); boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); @@ -86,7 +86,7 @@ public class ClickhouseSink extends RichSinkFunction>> connection.commit(); connection.setAutoCommit(autoCommit); stopWatch.stop(); - log.info("总共花费时间 {}", stopWatch.getTime()); + log.debug("总共花费时间 {}", stopWatch.getTime()); } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); }