From 3d418a58de359dff5808cc5b10c9b01dbf76ed07 Mon Sep 17 00:00:00 2001 From: zhanghongqing Date: Wed, 9 Nov 2022 10:10:52 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E6=97=A5=E5=BF=97=E5=85=A5=E5=BA=93=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E9=80=89=E6=8B=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- properties/default_config.properties | 2 +- properties/service_flow_config.properties | 30 +++++++++---------- .../etl/connection/ConnLogService.java | 8 ++--- .../com/zdjizhi/etl/dns/DnsLogService.java | 10 +++---- .../com/zdjizhi/utils/arangodb/AGSink.java | 4 +-- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/properties/default_config.properties b/properties/default_config.properties index 74034d3..47e0a7a 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -12,7 +12,7 @@ max.partition.fetch.bytes=31457280 retries=0 #\u4ED6\u7684\u542B\u4E49\u5C31\u662F\u8BF4\u4E00\u4E2ABatch\u88AB\u521B\u5EFA\u4E4B\u540E\uFF0C\u6700\u591A\u8FC7\u591A\u4E45\uFF0C\u4E0D\u7BA1\u8FD9\u4E2ABatch\u6709\u6CA1\u6709\u5199\u6EE1\uFF0C\u90FD\u5FC5\u987B\u53D1\u9001\u51FA\u53BB\u4E86 -linger.ms=10 +linger.ms=30 #\u5982\u679C\u5728\u8D85\u65F6\u4E4B\u524D\u672A\u6536\u5230\u54CD\u5E94\uFF0C\u5BA2\u6237\u7AEF\u5C06\u5728\u5FC5\u8981\u65F6\u91CD\u65B0\u53D1\u9001\u8BF7\u6C42 request.timeout.ms=30000 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 1129937..c742cd2 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,14 +1,14 @@ #--------------------------------\u5730\u5740\u914D\u7F6E------------------------------# #\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094 -source.kafka.servers=192.168.44.85:9094 +source.kafka.servers=192.168.44.12:9092 #\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740 -sink.kafka.servers=192.168.44.85:9094 +sink.kafka.servers=192.168.44.12:9092 #--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------# tools.library= #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=KNOWLEDGE-GROUP-20220905 +group.id=KNOWLEDGE-GROUP-20220928 #--------------------------------topology\u914D\u7F6E------------------------------# #consumer \u5E76\u884C\u5EA6 source.parallelism=1 @@ -19,7 +19,7 @@ sink.parallelism=1 #--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 -log.type=1 +log.type=2 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none @@ -32,9 +32,9 @@ sink.kafka.topic.relation.connection=CONNECTION-RELATION-LOG sink.kafka.topic.relation.dns=DNS-RELATION-LOG #\u5199\u5165clickhouse\u672C\u5730\u8868 -sink.ck.table.connection=connection_record_log_local -sink.ck.table.sketch=connection_sketch_record_log_local -sink.ck.table.dns=dns_record_log_local +sink.ck.table.connection=CONNECTION-RECORD-COMPLETED-LOG +sink.ck.table.sketch=CONNECTION-SKETCH-RECORD-COMPLETED-LOG +sink.ck.table.dns=DNS-RECORD-COMPLETED-LOG sink.ck.table.relation.connection=connection_relation_log_local sink.ck.table.relation.dns=dns_relation_log_local #\u5199arangodb\u8868 @@ -44,10 +44,10 @@ sink.arangodb.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN sink.arangodb.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN -#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F -sink.ck.raw.log.insert.open=1 +#\u5165\u5E93\u539F\u59CB\u65E5\u5FD71\uFF1Aflink\uFF0C2\uFF1Akafka , 3:\u53EA\u7EDF\u8BA1\u65E5\u5FD7 +sink.ck.raw.log.insert.open=2 #clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 -ck.hosts=192.168.44.85:8123 +ck.hosts=192.168.44.12:8123 # ,192.168.44.86:8123,192.168.44.87:8123 ck.database=tsg_galaxy_v3 ck.username=tsg_insert @@ -63,23 +63,23 @@ sink.ck.batch.delay.time=30000 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 flink.watermark.max.delay.time=60 #ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds -log.aggregate.duration=5 +log.aggregate.duration=30 #arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds log.aggregate.duration.graph=10 #arangoDB\u53C2\u6570\u914D\u7F6E -arangodb.host=192.168.44.83 +arangodb.host=192.168.44.12 arangodb.port=8529 arangodb.user=root arangodb.password=galaxy_2019 arangodb.db.name=knowledge -arangodb.batch=10000 +arangodb.batch=50000 arangodb.ttl=3600 arangodb.thread.pool.number=10 #\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms -sink.arangodb.batch.delay.time=1000 +sink.arangodb.batch.delay.time=10000 aggregate.max.value.length=18 -#\u662F\u5426\u5165ip2ip\u8868 1:\u662F +#\u662F\u5426\u5165ip2ip\u8868 0\uFF1A\u5426\uFF0C 1:\u662F sink.arangodb.raw.log.insert.open=0 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java index 9c5f8e0..ab07376 100644 --- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java @@ -35,14 +35,15 @@ public class ConnLogService { LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH); //写入ck通联relation表 LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION); LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH); LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION); } if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) { - DataStream> sketchTransformStream = getSketchTransformStream(sketchSource); //合并通联和通联sketch @@ -96,7 +97,7 @@ public class ConnLogService { } private static DataStream> getConnTransformStream(DataStream> connSource) throws Exception { - DataStream> connTransformStream = connSource + return connSource .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> { @@ -109,7 +110,6 @@ public class ConnLogService { .setParallelism(TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0) .setParallelism(TRANSFORM_PARALLELISM); - return connTransformStream; } private static DataStream> getSketchTransformStream(DataStream> sketchSource) throws Exception { diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java index 7c67d6e..dd5aff3 100644 --- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java @@ -32,9 +32,11 @@ public class DnsLogService { LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS); //dns 拆分后relation日志 ck入库 LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS); - } else { + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){ LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS); LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); + } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){ + LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS); } //arango 入库,按record_type分组入不同的表 @@ -57,18 +59,17 @@ public class DnsLogService { private static DataStream> getLogSource(StreamExecutionEnvironment env, String source) throws Exception { - DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source)) + return env.addSource(KafkaConsumer.myDeserializationConsumer(source)) .setParallelism(SOURCE_PARALLELISM) .filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0) .setParallelism(SOURCE_PARALLELISM) .map(new DnsMapFunction()) .setParallelism(SOURCE_PARALLELISM) .name(source); - return dnsSource; } private static DataStream> getDnsTransformStream(DataStream> dnsSource) throws Exception { - DataStream> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response"))) + return dnsSource.filter(x -> Objects.nonNull(x.get("response"))) .setParallelism(SOURCE_PARALLELISM) .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) @@ -80,7 +81,6 @@ public class DnsLogService { .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) .setParallelism(TRANSFORM_PARALLELISM); - return dnsTransform; } public static void getLogArangoSink(DataStream sourceStream, String sink) throws Exception { diff --git a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java index db4b207..d4d05a5 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java @@ -24,7 +24,7 @@ public class AGSink extends RichSinkFunction { private static final Log logger = LogFactory.get(); // ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP - private final List ipWithDataList; + private final CopyOnWriteArrayList ipWithDataList; // 满足此时间条件写出数据 private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME; // 插入的批次 @@ -82,7 +82,7 @@ public class AGSink extends RichSinkFunction { if (ipWithDataList.size() >= this.insertArangoBatchSize) { try { flush(ipWithDataList); - } catch (SQLException e) { + } catch (Exception e) { logger.error("ck sink invoke flush failed.", e); } }