diff --git a/pom.xml b/pom.xml index aa4dbae..b48acf1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi knowledge-log - 20220819 + 20220901 log-completion-schema http://www.example.com @@ -235,6 +235,11 @@ HikariCP 3.2.0 --> + + + + + diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3f47eda..4214242 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,25 +1,25 @@ #--------------------------------\u5730\u5740\u914D\u7F6E------------------------------# #\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094 -source.kafka.servers=192.168.44.85:9094,192.168.44.86:9094,192.168.44.87:9094 +source.kafka.servers=192.168.44.12:9094 #\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740 -sink.kafka.servers= +sink.kafka.servers=192.168.44.12:9094 #--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------# tools.library= #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=KNOWLEDGE-GROUP5 +group.id=KNOWLEDGE-GROUPtest #--------------------------------topology\u914D\u7F6E------------------------------# #consumer \u5E76\u884C\u5EA6 source.parallelism=1 #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 transform.parallelism=1 #kafka producer \u5E76\u884C\u5EA6 -sink.parallelism=3 +sink.parallelism=1 #--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 -log.type=2 +log.type=1 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none @@ -27,6 +27,10 @@ producer.kafka.compression.type=none source.kafka.topic.connection=CONNECTION-RECORD-LOG source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG source.kafka.topic.dns=DNS-RECORD-LOG +#kafka\u56DE\u5199\u7EDF\u8BA1\u6570\u636E +sink.kafka.topic.relation.connection=CONNECTION-RELATION-LOG +sink.kafka.topic.relation.dns=DNS-RELATION-LOG + #\u5199\u5165clickhouse\u672C\u5730\u8868 sink.ck.table.connection=connection_record_log_local sink.ck.table.sketch=connection_sketch_record_log_local @@ -43,7 +47,8 @@ sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN #\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F sink.ck.raw.log.insert.open=1 #clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 -ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123 +ck.hosts=192.168.44.12:8123 +# ,192.168.44.86:8123,192.168.44.87:8123 ck.database=tsg_galaxy_v3 ck.username=tsg_insert ck.pin=galaxy2019 @@ -51,19 +56,19 @@ ck.pin=galaxy2019 ck.connection.timeout=10000 ck.socket.timeout=600000 #clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761 -ck.batch=100000 +ck.batch=20000 #clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 -sink.ck.batch.delay.time=30000 +sink.ck.batch.delay.time=3000 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 flink.watermark.max.delay.time=60 #ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds -log.aggregate.duration=30 +log.aggregate.duration=10 #arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds -log.aggregate.duration.graph=600 +log.aggregate.duration.graph=30 #arangoDB\u53C2\u6570\u914D\u7F6E -arangodb.host=192.168.44.83 +arangodb.host=192.168.44.12 arangodb.port=8529 arangodb.user=root arangodb.password=galaxy_2019 @@ -72,4 +77,6 @@ arangodb.batch=10000 arangodb.ttl=3600 arangodb.thread.pool.number=10 #\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms -sink.arangodb.batch.delay.time=1000 \ No newline at end of file +sink.arangodb.batch.delay.time=1000 + +aggregate.max.value.length=18 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 7889c88..59c059e 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -106,7 +106,8 @@ public class FlowWriteConfig { ; public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection"); public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch"); - + public static final String SINK_KAFKA_TOPIC_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.connection"); + public static final String SINK_KAFKA_TOPIC_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.dns"); //sink.ck.table public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection"); public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch"); @@ -114,6 +115,7 @@ public class FlowWriteConfig { public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); + public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip"); public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain"); public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain"); @@ -134,4 +136,5 @@ public class FlowWriteConfig { public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time"); public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch"); public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open"); + public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/enums/LogMetadata.java b/src/main/java/com/zdjizhi/enums/LogMetadata.java index 576b846..7c501b5 100644 --- a/src/main/java/com/zdjizhi/enums/LogMetadata.java +++ b/src/main/java/com/zdjizhi/enums/LogMetadata.java @@ -18,12 +18,7 @@ public enum LogMetadata { * 日志名称,表名,字段 * */ - CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{ - "cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", - "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", - "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", - "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", - "imei", "imsi"}), + CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}), CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}), CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}), DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}), diff --git a/src/main/java/com/zdjizhi/etl/LogService.java b/src/main/java/com/zdjizhi/etl/LogService.java index 56989b1..052a8b3 100644 --- a/src/main/java/com/zdjizhi/etl/LogService.java +++ b/src/main/java/com/zdjizhi/etl/LogService.java @@ -1,8 +1,10 @@ package com.zdjizhi.etl; +import cn.hutool.json.JSONUtil; import com.zdjizhi.etl.connection.ArangodbBatchIPWindow; import com.zdjizhi.utils.arangodb.ArangoDBSink; -import com.zdjizhi.utils.ck.ClickhouseSink; +import com.zdjizhi.utils.ck.CKSink; +import com.zdjizhi.utils.kafka.KafkaProducer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; @@ -12,21 +14,23 @@ import java.util.Map; import static com.zdjizhi.common.FlowWriteConfig.*; -public interface LogService { +public class LogService { - public static void getLogCKSink(DataStream> sourceStream, String sink) throws Exception{ - sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) - .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) - .apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(sink)) - .setParallelism(SINK_PARALLELISM) - .name(sink) - .setParallelism(SINK_PARALLELISM); +// @Deprecated +// public static void getLogCKSink3(DataStream> sourceStream, String sink) throws Exception { +// +// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) +// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime)) +// .apply(new CKBatchWindow()) +// .addSink(new ClickhouseSink(sink)) +// .setParallelism(SINK_PARALLELISM) +// .name(sink) +// .setParallelism(SINK_PARALLELISM); +// +// } - } - - public static void getLogArangoSink(DataStream> sourceStream, String sink) throws Exception{ + public static void getLogArangoSink(DataStream> sourceStream, String sink) throws Exception { sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) .trigger(new CountTriggerWithTimeout<>(sink, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) .apply(new ArangodbBatchIPWindow()) @@ -35,4 +39,21 @@ public interface LogService { .name(sink) .setParallelism(SINK_PARALLELISM); } -} + + public static void getLogKafkaSink(DataStream> sourceStream, String sink) throws Exception { + sourceStream.map(JSONUtil::toJsonStr) + .setParallelism(SINK_PARALLELISM) + .addSink(KafkaProducer.getKafkaProducer(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + } + + public static void getLogCKSink(DataStream> sourceStream, String sink) throws Exception { + sourceStream.addSink(new CKSink(sink)) + .setParallelism(SINK_PARALLELISM) + .name(sink) + .setParallelism(SINK_PARALLELISM); + + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index c61fdaa..b0fff6b 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -7,8 +7,9 @@ import com.zdjizhi.etl.dns.SketchTimeMapFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; @@ -20,22 +21,23 @@ import static com.zdjizhi.common.FlowWriteConfig.*; public class ConnLogService { - public static void connLogStream(StreamExecutionEnvironment env) throws Exception{ + public static void connLogStream(StreamExecutionEnvironment env) throws Exception { //connection DataStream> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION); //sketch DataStream> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH); - - //写入CKsink,批量处理 - LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); - - LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); - //transform DataStream> connTransformStream = getConnTransformStream(connSource); - //写入ck通联relation表 - LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + //写入CKsink,批量处理 + LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION); + LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); + //写入ck通联relation表 + LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); + } else { + LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); + } DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); @@ -57,14 +59,31 @@ public class ConnLogService { String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time"; - DataStream> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + SingleOutputStreamOperator> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) - .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0) + .filter(x -> { + if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) { + return false; + } + if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) { + if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || + String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + return false; + } + return true; + } else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) { + if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH || + String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) { + return false; + } + return true; + } else { + return false; + } + }).setParallelism(SOURCE_PARALLELISM); + DataStream> sourceStream = filterStream.map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) .setParallelism(SOURCE_PARALLELISM) - .map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction()) - .setParallelism(SOURCE_PARALLELISM) - .name(source) - .setParallelism(SOURCE_PARALLELISM); + .name(source); return sourceStream; } @@ -77,7 +96,7 @@ public class ConnLogService { })) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return connTransformStream; @@ -88,7 +107,7 @@ public class ConnLogService { .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new SketchProcessFunction()); return sketchTransformStream; } @@ -96,7 +115,7 @@ public class ConnLogService { private static DataStream> getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) .keyBy(new IpKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return ip2ipGraph; diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java index e469ac1..b727535 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService2.java @@ -10,7 +10,7 @@ //import org.apache.flink.api.java.utils.ParameterTool; //import org.apache.flink.streaming.api.datastream.DataStream; //import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; //import org.apache.flink.streaming.api.windowing.time.Time; //import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; //import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; @@ -134,7 +134,7 @@ // })) // .setParallelism(TRANSFORM_PARALLELISM) // .keyBy(new IpKeysSelector()) -// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) +// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) // .process(new ConnProcessFunction()) // .setParallelism(TRANSFORM_PARALLELISM); // return connTransformStream; @@ -145,7 +145,7 @@ // .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) // .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) // .keyBy(new IpKeysSelector()) -// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) +// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) // .process(new SketchProcessFunction()); // return sketchTransformStream; // } @@ -153,7 +153,7 @@ // private static DataStream> getConnUnion(DataStream> connTransformStream, DataStream> sketchTransformStream) throws Exception { // DataStream> ip2ipGraph = connTransformStream.union(sketchTransformStream) // .keyBy(new IpKeysSelector()) -// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) +// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) // .process(new Ip2IpGraphProcessFunction()) // .setParallelism(TRANSFORM_PARALLELISM); // return ip2ipGraph; diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index bdad72a..91cb47f 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -56,16 +56,13 @@ public class ConnProcessFunction extends ProcessWindowFunction 0) { sessions++; - Long totalCsPkts = TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")); - Long totalScPkts = TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); - packets = packets + totalCsPkts < Long.MAX_VALUE ? totalCsPkts : 0 + totalScPkts < Long.MAX_VALUE ? totalScPkts : 0; - - Long totalCsBytes = TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")); - Long totalScBytes = TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); - bytes = bytes + totalCsBytes< Long.MAX_VALUE ? totalCsBytes : 0 + totalScBytes< Long.MAX_VALUE ? totalScBytes : 0; + packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts")); + bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes")); startTime = connStartTimetime < startTime ? connStartTimetime : startTime; endTime = connStartTimetime > endTime ? connStartTimetime : endTime; + packets = packets > Long.MAX_VALUE ? 0 : packets; + bytes = bytes > Long.MAX_VALUE ? 0 : bytes; } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java new file mode 100644 index 0000000..e38517a --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/connection/ConnReduceFunction.java @@ -0,0 +1,80 @@ +package com.zdjizhi.etl.connection; + +import cn.hutool.core.convert.Convert; +import cn.hutool.core.date.DateUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.util.TypeUtils; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.tuple.Tuple5; + +import java.util.Map; + + +/** + * @author 94976 + */ +public class ConnReduceFunction implements ReduceFunction> { + + private static final Log logger = LogFactory.get(); + + // +// public void process(Tuple2 keys, Context context, Iterable> elements, Collector> out) { +// try { +// Tuple5 values = sum(elements); +// if (values != null) { +// Map result = new HashMap<>(); +// result.put("start_time", values.f0); +// result.put("end_time", values.f1); +// result.put("src_ip", keys.f0); +// result.put("dst_ip", keys.f1); +// result.put("sessions", values.f2); +// result.put("packets", values.f3); +// result.put("bytes", values.f4); +// out.collect(result); +// logger.debug("获取中间聚合结果:{}", result.toString()); +// } +// } catch (Exception e) { +// logger.error("获取中间聚合结果失败,middleResult: {}", e); +// } +// } +// + private Tuple5 sum(Map map1, Map map2) { + + try { + long sessions = 0L; + long packets = 0L; + long bytes = 0L; + long startTime = DateUtil.currentSeconds(); + long endTime = DateUtil.currentSeconds(); + + long connStartTime1 = Convert.toLong(map1.get("conn_start_time")); + long connStartTime2 = Convert.toLong(map2.get("conn_start_time")); + if (connStartTime1 > 0 && connStartTime2 > 0) { + sessions++; + packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) + + TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts")); + + bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) + + TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes")); + + startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2; + endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2; + + packets = packets > Long.MAX_VALUE ? 0 : packets; + bytes = bytes > Long.MAX_VALUE ? 0 : bytes; + + } + + } catch (Exception e) { + logger.error("聚合中间结果集失败 {}", e); + } + return null; + } + + @Override + public Map reduce(Map map1, Map map2) throws Exception { + + return null; + } +} diff --git a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index 31957dc..e2dd3f7 100644 --- a/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -69,11 +69,15 @@ public class SketchProcessFunction extends ProcessWindowFunction newSketchLog : elements) { long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time")); if (connStartTime > 0) { - sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) : 0; - packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_packets")) : 0; - bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) : 0; + sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")); + packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")); + bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")); startTime = connStartTime < startTime ? connStartTime : startTime; endTime = connStartTime > endTime ? connStartTime : endTime; + + sessions = sessions > Long.MAX_VALUE ? 0 : sessions; + packets = packets > Long.MAX_VALUE ? 0 : packets; + bytes = bytes > Long.MAX_VALUE ? 0 : bytes; } } return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java index d8b3a36..cde34c1 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; @@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction tmpTime ? startTime : tmpTime; } - Map newLog = new LinkedHashMap<>(); + Map newLog = new HashMap<>(); newLog.put("record_type", keys.f0); newLog.put("qname", keys.f1); newLog.put("record", keys.f2); diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 63f96e3..b4ab915 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -8,6 +8,7 @@ import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; @@ -23,13 +24,16 @@ public class DnsLogService { DataStream> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS); - //dns 原始日志 ck入库 - LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); - DataStream> dnsTransform = getDnsTransformStream(dnsSource); - //dns 拆分后relation日志 ck入库 - LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + //dns 原始日志 ck入库 + LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); + //dns 拆分后relation日志 ck入库 + LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); + } else { + LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + } //arango 入库,按record_type分组入不同的表 DataStream> dnsGraph = dnsTransform.filter(Objects::nonNull) @@ -67,7 +71,7 @@ public class DnsLogService { .flatMap(new DnsSplitFlatMapFunction()) .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new DnsGraphKeysSelector()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) + .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); return dnsTransform; diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index fa66f37..1da5a90 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -26,6 +26,8 @@ public class DnsMapFunction implements MapFunction, Map map(Map rawLog) throws Exception { try { rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time"))); + //qname ,record 转小写 + rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname")))); if (Objects.nonNull(rawLog.get("response"))) { List> response = (List>) rawLog.get("response"); String dnsA = ""; @@ -40,7 +42,8 @@ public class DnsMapFunction implements MapFunction, Map resMap : response) { String type = StrUtil.toString(resMap.get("res_type")); - String body = StrUtil.toString(resMap.get("res_body")); + String body = StringUtils.lowerCase(StrUtil.toString(resMap.get("res_body"))); + resMap.put("res_body",body); if (DnsType.A.getCode().equals(type)) { dnsA = Joiner.on(",").skipNulls().join(dnsA, body); dnsANum++; diff --git a/src/main/java/com/zdjizhi/utils/ck/CKSink.java b/src/main/java/com/zdjizhi/utils/ck/CKSink.java new file mode 100644 index 0000000..95ef69e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/CKSink.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zdjizhi.utils.ck; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.enums.LogMetadata; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import ru.yandex.clickhouse.ClickHousePreparedStatement; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH; +import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME; + +public class CKSink extends RichSinkFunction> { + private static final Log log = LogFactory.get(); + + private static final long serialVersionUID = 1L; + + private static final Log logger = LogFactory.get(); + private Connection connection; + private ClickHousePreparedStatement preparedStatement = null; + // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP + private final List ipWithDataList; + + // 满足此时间条件写出数据 + private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L + // 插入的批次 + private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条 + + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + + private static final Map logMetadataFields = new HashMap<>(); + private static final Map logMetadataSql = new HashMap<>(); + + static { + for (LogMetadata value : LogMetadata.values()) { + logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); + logMetadataFields.put(value.getSink(), value.getFields()); + } + } + + // 数据表名 + private String sink; + + public CKSink(String sink) { + this.sink = sink; + this.ipWithDataList = new CopyOnWriteArrayList<>(); + } + + public String getSink() { + return sink; + } + + /** + * Connects to the target database and initializes the prepared statement. + */ + @Override + public void open(Configuration parameters) throws Exception { + connection = CKUtils.getConnection(); + String sql = logMetadataSql.get(sink); + log.debug(sql); + preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); + + if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) { + this.scheduler = Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (CKSink.this) { + if (!closed) { + try { + logger.info("ck_flush............."); + flushClose(); + } catch (Exception e) { + log.error(e); + } + } + } + }, + insertCkTimeInterval, + insertCkTimeInterval, + TimeUnit.MILLISECONDS); + } + + } + + @Override + public final synchronized void invoke(Map row, Context context) throws IOException { + ipWithDataList.add(row); + /** + * 1. 将数据写入CK + */ + if (ipWithDataList.size() >= this.insertCkBatchSize) { + try { + flush(ipWithDataList); + logger.info("insertCkBatchSize"); + } catch (SQLException e) { + throw new RuntimeException("Preparation of JDBC statement failed.", e); + } + } + + } + + // 插入数据 + private synchronized void flush(List data) throws SQLException { + if (data.size() > 0) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + log.info("开始写入ck数据 :{}", data.size()); + + connection.setAutoCommit(false); + + String[] logFields = logMetadataFields.get(sink); + + for (Map map : data) { + for (int i = 0; i < logFields.length; i++) { + preparedStatement.setObject(i + 1, map.get(logFields[i])); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + connection.commit(); + preparedStatement.clearBatch(); + stopWatch.stop(); + log.info("总共花费时间 {} ms", stopWatch.getTime()); + log.info("写入ck表{},数据 {}", sink, data.size()); + ipWithDataList.clear(); + } + } + + private synchronized void flushClose() { + try { + flush(ipWithDataList); + } catch (SQLException e) { + log.error("Preparation of JDBC statement failed.", e); + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (preparedStatement != null) { + flushClose(); + try { + preparedStatement.close(); + } catch (SQLException e) { + log.error("JDBC statement could not be closed: " + e.getMessage()); + } finally { + preparedStatement = null; + } + } + + try { + CKUtils.close(connection); + } catch (Exception e) { + log.error("JDBC connection could not be closed: " + e.getMessage()); + } + } + } +} + diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java new file mode 100644 index 0000000..40b7db6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zdjizhi.utils.ck; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.enums.LogMetadata; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.ClickHousePreparedStatement; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH; +import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME; + +public class ClickHouseJDBCOutput extends RichSinkFunction> { + private static final Log log = LogFactory.get(); + + private static final long serialVersionUID = 1L; + + private static final Logger logger = LoggerFactory.getLogger(ClickHouseJDBCOutput.class); + + private Connection connection; + private ClickHousePreparedStatement preparedStatement = null; + // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP + private final List ipWithDataList; + + // 满足此时间条件写出数据 + private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L + // 插入的批次 + private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条 + + private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + + private static final Map logMetadataFields = new HashMap<>(); + private static final Map logMetadataSql = new HashMap<>(); + + static { + for (LogMetadata value : LogMetadata.values()) { + logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); + logMetadataFields.put(value.getSink(), value.getFields()); + } + } + + // 数据表名 + private String sink; + + public ClickHouseJDBCOutput(String sink) { + this.sink = sink; + this.ipWithDataList = new CopyOnWriteArrayList<>(); + } + + public String getSink() { + return sink; + } + + /** + * Connects to the target database and initializes the prepared statement. + */ + @Override + public void open(Configuration parameters) throws Exception { + connection = CKUtils.getConnection(); + String sql = logMetadataSql.get(sink); + log.debug(sql); + preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); + + if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) { + this.scheduler = Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (ClickHouseJDBCOutput.this) { + if (!closed) { + try { + logger.info("ck_flush............."); + flushClose(); + } catch (Exception e) { +// flushException = e; + log.error(e); + } + } + } + }, + insertCkTimeInterval, + insertCkTimeInterval, + TimeUnit.MILLISECONDS); + } + + } + + @Override + public final synchronized void invoke(Map row, Context context) throws IOException { + ipWithDataList.add(row); + /** + * 1. 将数据写入CK + */ + if (ipWithDataList.size() >= this.insertCkBatchSize) { + try { + flush(ipWithDataList); + logger.info("insertCkBatchSize"); + } catch (SQLException e) { + throw new RuntimeException("Preparation of JDBC statement failed.", e); + } + } + + } + + // 插入数据 + private synchronized void flush(List data) throws SQLException { + if (data.size() > 0) { +// checkFlushException(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + log.info("开始写入ck数据 :{}", data.size()); + + connection.setAutoCommit(false); + + String[] logFields = logMetadataFields.get(sink); + + for (Map map : data) { + for (int i = 0; i < logFields.length; i++) { + preparedStatement.setObject(i + 1, map.get(logFields[i])); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + connection.commit(); + preparedStatement.clearBatch(); + stopWatch.stop(); + log.info("总共花费时间 {} ms", stopWatch.getTime()); + log.info("写入ck表{},数据 {}", sink, data.size()); + ipWithDataList.clear(); + } + } + + private synchronized void flushClose() { + try { + flush(ipWithDataList); + } catch (SQLException e) { + log.error("Preparation of JDBC statement failed.", e); + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (preparedStatement != null) { + flushClose(); + try { + preparedStatement.close(); + } catch (SQLException e) { + log.error("JDBC statement could not be closed: " + e.getMessage()); + } finally { + preparedStatement = null; + } + } + + try { + CKUtils.close(connection); + } catch (Exception e) { + log.error("JDBC connection could not be closed: " + e.getMessage()); + } + } + } +} + diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index 0558dae..54070d8 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -4,7 +4,6 @@ import cn.hutool.core.io.IoUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zdjizhi.enums.LogMetadata; -import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import ru.yandex.clickhouse.ClickHousePreparedStatement; @@ -63,11 +62,10 @@ public class ClickhouseSink extends RichSinkFunction>> public void executeInsert(List> data, String tableName) { try { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); +// StopWatch stopWatch = new StopWatch(); +// stopWatch.start(); log.debug("开始写入ck数据 :{}", data.size()); - boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); String[] logFields = logMetadataFields.get(tableName); @@ -81,12 +79,11 @@ public class ClickhouseSink extends RichSinkFunction>> } preparedStatement.addBatch(); } - preparedStatement.executeBatch(); connection.commit(); - connection.setAutoCommit(autoCommit); - stopWatch.stop(); - log.debug("总共花费时间 {}", stopWatch.getTime()); +// stopWatch.stop(); +// log.debug("总共花费时间 {} ms", stopWatch.getTime()); + log.debug("写入ck表{},数据 {}",tableName, data.size()); } catch (Exception ex) { log.error("ClickhouseSink插入报错", ex); } diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java new file mode 100644 index 0000000..6cf2f60 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java @@ -0,0 +1,106 @@ +//package com.zdjizhi.utils.ck; +// +//import cn.hutool.core.io.IoUtil; +//import cn.hutool.log.Log; +//import cn.hutool.log.LogFactory; +//import com.zdjizhi.enums.LogMetadata; +//import org.apache.commons.lang3.time.StopWatch; +//import org.apache.flink.configuration.Configuration; +//import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +//import ru.yandex.clickhouse.ClickHousePreparedStatement; +// +//import java.sql.Connection; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +// +//import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH; +// +//public class ClickhouseSink2 extends RichSinkFunction>> { +// +// private static final Log log = LogFactory.get(); +// +// private Connection connection; +// private ClickHousePreparedStatement preparedStatement; +// public String sink; +// +// private static final Map logMetadataFields = new HashMap<>(); +// private static final Map logMetadataSql = new HashMap<>(); +// +// static { +// for (LogMetadata value : LogMetadata.values()) { +// logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink())); +// logMetadataFields.put(value.getSink(), value.getFields()); +// } +// } +// +// public ClickhouseSink2(String sink) { +// this.sink = sink; +// } +// +// public String getSink() { +// return sink; +// } +// +// public void setSink(String sink) { +// this.sink = sink; +// } +// +// @Override +// public void invoke(List> logs, Context context) throws Exception { +// executeInsert(logs, getSink()); +// } +// +// @Override +// public void open(Configuration parameters) throws Exception { +// connection = CKUtils.getConnection(); +// String sql = logMetadataSql.get(sink); +// log.debug(sql); +// connection.setAutoCommit(false); +// preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql); +// } +// +// @Override +// public void close() throws Exception { +// IoUtil.close(preparedStatement); +// CKUtils.close(connection); +// } +// +// public void executeInsert(List> data, String tableName) { +// +// try { +// StopWatch stopWatch = new StopWatch(); +// stopWatch.start(); +// log.info("开始写入ck数据 :{}", data.size()); +// +// String[] logFields = logMetadataFields.get(tableName); +// +// int count = 0; +// for (Map map : data) { +// for (int i = 0; i < logFields.length; i++) { +// preparedStatement.setObject(i + 1, map.get(logFields[i])); +// } +// preparedStatement.addBatch(); +// count++; +// if (count % CK_BATCH == 0) { +// preparedStatement.executeBatch(); +// connection.commit(); +// preparedStatement.clearBatch(); +// count = 0; +// } +// } +// if (count > 0) { +// preparedStatement.executeBatch(); +// connection.commit(); +// preparedStatement.clearBatch(); +// } +// +// stopWatch.stop(); +// log.info("总共花费时间 {} ms", stopWatch.getTime()); +// log.info("写入ck表{},数据 {}", tableName, data.size()); +// } catch (Exception ex) { +// log.error("ClickhouseSink插入报错", ex); +// } +// } +// +//} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java index e7ba647..1beb1fa 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -181,7 +181,7 @@ public class TypeUtils { public static Object coverMSToS(Object ms) { if (StrUtil.toString(ms).length() == 13) { - return StrUtil.sub(StrUtil.toString(ms), 0, 10); + return Convert.toLong(StrUtil.sub(StrUtil.toString(ms), 0, 10)); } return ms; }