diff --git a/pom.xml b/pom.xml index de2b189..7cdc355 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ 1.13.1 1.0.0 1.0.8 - + provided @@ -65,7 +65,7 @@ - + org.apache.maven.plugins @@ -243,11 +243,11 @@ clickhouse-jdbc 0.2.6 - + com.arangodb arangodb-java-driver diff --git a/properties/default_config.properties b/properties/default_config.properties index aaeccfc..25a975d 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -8,58 +8,40 @@ max.poll.records=5000 #kafka source poll bytes max.partition.fetch.bytes=31457280 #====================Kafka KafkaProducer====================# -#producer重试的次数设置 +#producer\u91CD\u8BD5\u7684\u6B21\u6570\u8BBE\u7F6E retries=0 -#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +#\u4ED6\u7684\u542B\u4E49\u5C31\u662F\u8BF4\u4E00\u4E2ABatch\u88AB\u521B\u5EFA\u4E4B\u540E\uFF0C\u6700\u591A\u8FC7\u591A\u4E45\uFF0C\u4E0D\u7BA1\u8FD9\u4E2ABatch\u6709\u6CA1\u6709\u5199\u6EE1\uFF0C\u90FD\u5FC5\u987B\u53D1\u9001\u51FA\u53BB\u4E86 linger.ms=10 -#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +#\u5982\u679C\u5728\u8D85\u65F6\u4E4B\u524D\u672A\u6536\u5230\u54CD\u5E94\uFF0C\u5BA2\u6237\u7AEF\u5C06\u5728\u5FC5\u8981\u65F6\u91CD\u65B0\u53D1\u9001\u8BF7\u6C42 request.timeout.ms=30000 -#producer都是按照batch进行发送的,批次大小,默认:16384 +#producer\u90FD\u662F\u6309\u7167batch\u8FDB\u884C\u53D1\u9001\u7684,\u6279\u6B21\u5927\u5C0F\uFF0C\u9ED8\u8BA4:16384 batch.size=262144 -#Producer端用于缓存消息的缓冲区大小 +#Producer\u7AEF\u7528\u4E8E\u7F13\u5B58\u6D88\u606F\u7684\u7F13\u51B2\u533A\u5927\u5C0F #128M buffer.memory=134217728 -#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +#\u8FD9\u4E2A\u53C2\u6570\u51B3\u5B9A\u4E86\u6BCF\u6B21\u53D1\u9001\u7ED9Kafka\u670D\u52A1\u5668\u8BF7\u6C42\u7684\u6700\u5927\u5927\u5C0F,\u9ED8\u8BA41048576 #10M max.request.size=10485760 #====================kafka default====================# -#kafka SASL验证用户名-加密 +#kafka SASL\u9A8C\u8BC1\u7528\u6237\u540D-\u52A0\u5BC6 kafka.user=nsyGpHKGFA4KW0zro9MDdw== -#kafka SASL及SSL验证密码-加密 +#kafka SASL\u53CASSL\u9A8C\u8BC1\u5BC6\u7801-\u52A0\u5BC6 kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ -#生产者ack +#\u751F\u4EA7\u8005ack producer.ack=1 -#====================nacos default====================# -#nacos username -nacos.username=nacos -#nacos password -nacos.pin=nacos - -#nacos group -nacos.group=Galaxy -#====================Topology Default====================# -#hbase table name -hbase.table.name=tsg_galaxy:relation_framedip_account - -#邮件默认编码 +#\u90AE\u4EF6\u9ED8\u8BA4\u7F16\u7801 mail.default.charset=UTF-8 -#0不做任何校验,1弱类型校验 +#0\u4E0D\u505A\u4EFB\u4F55\u6821\u9A8C\uFF0C1\u5F31\u7C7B\u578B\u6821\u9A8C log.transform.type=1 -#两个输出之间的最大时间(单位milliseconds) +#\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds) buffer.timeout=5000 -#====================临时配置-待删除====================# -#网关APP_ID 获取接口 -app.id.http=http://192.168.44.20:9999/open-api/appDicList - -#app_id 更新时间,如填写0则不更新缓存 -app.tick.tuple.freq.secs=0 \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 91858cd..4de148b 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -8,51 +8,25 @@ sink.kafka.servers=192.168.45.102:9092 #zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id zookeeper.servers=192.168.45.102:2181 -#hbase zookeeper\u5730\u5740 \u7528\u4E8E\u8FDE\u63A5HBase -hbase.zookeeper.servers=192.168.45.102:2181 - #--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------# #\u5B9A\u4F4D\u5E93\u5730\u5740 tools.library=D:\\workerspace\\dat\\ -#--------------------------------nacos\u914D\u7F6E------------------------------# -#nacos \u5730\u5740 -nacos.server=192.168.45.102:8848 - -#nacos namespace -nacos.schema.namespace=prod - -#nacos data id -nacos.data.id=session_record.json - #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# - -#kafka \u63A5\u6536\u6570\u636Etopic -source.kafka.topic=atest - -#\u8865\u5168\u6570\u636E \u8F93\u51FA topic -sink.kafka.topic=atest2 - #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=flinktest-102 +group.id=knowledge-group #--------------------------------topology\u914D\u7F6E------------------------------# #consumer \u5E76\u884C\u5EA6 source.parallelism=1 - #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 transform.parallelism=1 - #kafka producer \u5E76\u884C\u5EA6 sink.parallelism=1 - #\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31) data.center.id.num=0 -#hbase \u66F4\u65B0\u65F6\u95F4\uFF0C\u5982\u586B\u51990\u5219\u4E0D\u66F4\u65B0\u7F13\u5B58 -hbase.tick.tuple.freq.secs=180 - #--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 log.need.complete=2 @@ -60,7 +34,6 @@ log.need.complete=2 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none - source.kafka.topic.connection=connection_record_log source.kafka.topic.sketch=connection_sketch_record_log source.kafka.topic.dns=dns_record_log @@ -78,36 +51,30 @@ sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN #clickhouse \u5165\u5E93 -ck.hosts=192.168.45.102:8123 +ck.hosts=192.168.45.102:8123,192.168.45.102:8123 ck.database=tsg_galaxy_v3 ck.username=default ck.pin=galaxy2019 #\u5355\u4F4D\u6BEB\u79D2 ck.connection.timeout=10000 ck.socket.timeout=300000 +ck.batch=10000 -#connection_record_log - -flink.watermark.max.orderness=50 -#\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds +#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 +flink.watermark.max.delay.time=50 +#ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds log.aggregate.duration=5 +#arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds log.aggregate.duration.graph=5 #arangoDB\u53C2\u6570\u914D\u7F6E arangoDB.host=192.168.45.102 -#arangoDB.host=192.168.40.224 arangoDB.port=8529 arangoDB.user=root arangoDB.password=galaxy_2019 arangoDB.DB.name=knowledge arangoDB.batch=100000 arangoDB.ttl=3600 - -arangoDB.read.limit= -update.arango.batch=10000 - thread.pool.number=10 -thread.await.termination.time=10 -sink.batch.time.out=5 -sink.batch=10000 \ No newline at end of file +sink.batch.delay.time=5 diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 57f09a0..3b68285 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -38,17 +38,6 @@ public class FlowWriteConfig { */ public static final String ENCODING = "UTF8"; - /** - * Nacos - */ - public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); - public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); - public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace"); - public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); - public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin"); - public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); - public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username"); - /** * System config */ @@ -61,12 +50,6 @@ public class FlowWriteConfig { public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); - /** - * HBase - */ - public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); - public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); - /** * kafka common */ @@ -76,7 +59,6 @@ public class FlowWriteConfig { /** * kafka source config */ - public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); @@ -85,7 +67,6 @@ public class FlowWriteConfig { /** * kafka sink config */ - public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); @@ -99,12 +80,6 @@ public class FlowWriteConfig { public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); - /** - * http - */ - public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http"); - public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); - /** * common config */ @@ -112,7 +87,6 @@ public class FlowWriteConfig { public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); - public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); /* @@ -125,7 +99,7 @@ public class FlowWriteConfig { public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout"); public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout"); - public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness"); + public static final int FLINK_WATERMARK_MAX_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.delay.time"); public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration"); public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph"); public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns"); @@ -147,8 +121,6 @@ public class FlowWriteConfig { public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain"); - - public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host"); public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port"); public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user"); @@ -156,11 +128,8 @@ public class FlowWriteConfig { public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name"); public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl"); public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch"); - - public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch"); - public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit"); public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number"); - public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time"); - public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out"); - public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch"); + + public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time"); + public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java similarity index 84% rename from src/main/java/com/zdjizhi/common/CKWindow.java rename to src/main/java/com/zdjizhi/etl/CKBatchWindow.java index b7c7b8c..f66455f 100644 --- a/src/main/java/com/zdjizhi/common/CKWindow.java +++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common; +package com.zdjizhi.etl; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -9,7 +9,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -public class CKWindow implements AllWindowFunction, List>, TimeWindow> { +public class CKBatchWindow implements AllWindowFunction, List>, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable> iterable, Collector>> out) throws Exception { diff --git a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java similarity index 90% rename from src/main/java/com/zdjizhi/common/ArangodbIPWindow.java rename to src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java index f91fb08..f5848c4 100644 --- a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java +++ b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common; +package com.zdjizhi.etl.connection; import cn.hutool.core.util.StrUtil; import com.arangodb.entity.BaseEdgeDocument; @@ -11,7 +11,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -public class ArangodbIPWindow implements AllWindowFunction, List, TimeWindow> { +public class ArangodbBatchIPWindow implements AllWindowFunction, List, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable> iterable, Collector> collector) throws Exception { diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java similarity index 78% rename from src/main/java/com/zdjizhi/etl/ConnProcessFunction.java rename to src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java index 49041dc..cec2425 100644 --- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; @@ -24,21 +24,8 @@ public class ConnProcessFunction extends ProcessWindowFunction keys, Context context, Iterable> elements, Collector> out) { - Map middleResult = getMiddleResult(keys, elements); - try { - if (middleResult != null) { - out.collect(middleResult); - logger.debug("获取中间聚合结果:{}", middleResult.toString()); - } - } catch (Exception e) { - logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e); - } - } - - private Map getMiddleResult(Tuple2 keys, Iterable> elements) { - - Tuple5 values = connAggregate(elements); try { + Tuple5 values = connAggregate(elements); if (values != null) { Map result = new LinkedHashMap<>(); result.put("start_time", values.f0); @@ -48,13 +35,12 @@ public class ConnProcessFunction extends ProcessWindowFunction connAggregate(Iterable> elements) { diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java similarity index 97% rename from src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java rename to src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java index 5bce25d..e9dd0e2 100644 --- a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java similarity index 93% rename from src/main/java/com/zdjizhi/common/IpKeysSelector.java rename to src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java index 2dc7e79..470648e 100644 --- a/src/main/java/com/zdjizhi/common/IpKeysSelector.java +++ b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common; +package com.zdjizhi.etl.connection; import org.apache.flink.api.java.functions.KeySelector; diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java similarity index 98% rename from src/main/java/com/zdjizhi/etl/SketchProcessFunction.java rename to src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java index 54d53b6..698ed55 100644 --- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.connection; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; diff --git a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java similarity index 90% rename from src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java rename to src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java index 373eef5..ada4b83 100644 --- a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java +++ b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common; +package com.zdjizhi.etl.dns; import cn.hutool.core.util.StrUtil; import com.arangodb.entity.BaseEdgeDocument; @@ -11,7 +11,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -public class ArangodbDnsWindow implements AllWindowFunction, List, TimeWindow> { +public class ArangodbBatchDnsWindow implements AllWindowFunction, List, TimeWindow> { @Override public void apply(TimeWindow timeWindow, Iterable> iterable, Collector> out) throws Exception { diff --git a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java similarity index 95% rename from src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java rename to src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java index 5aa08c5..f6e6a6f 100644 --- a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java @@ -1,4 +1,4 @@ -package com.zdjizhi.common; +package com.zdjizhi.etl.dns; import cn.hutool.core.util.StrUtil; import org.apache.flink.api.java.functions.KeySelector; diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java similarity index 98% rename from src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java rename to src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java index 18d7a71..d8b3a36 100644 --- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.convert.Convert; import cn.hutool.log.Log; diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java similarity index 96% rename from src/main/java/com/zdjizhi/etl/DnsMapFunction.java rename to src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java index 86c4616..b536152 100644 --- a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java +++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java @@ -1,4 +1,4 @@ -package com.zdjizhi.etl; +package com.zdjizhi.etl.dns; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONArray; @@ -47,7 +47,7 @@ public class DnsMapFunction implements MapFunction, Map> connTransformStream = connSource .assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) + .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -58,7 +60,7 @@ public class LogFlowWriteTopology { .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); DataStream> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) + .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000)) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) @@ -72,16 +74,16 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) -// .filter(Objects::nonNull) + .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); + sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); //写入arangodb - ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); + ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { @@ -92,7 +94,7 @@ public class LogFlowWriteTopology { .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS); DataStream> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy - .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS)) + .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) .flatMap(new DnsSplitFlatMapFunction()) .keyBy(new DnsGraphKeysSelector()) @@ -104,13 +106,13 @@ public class LogFlowWriteTopology { //dns 原始日志 ck入库 dnsSource.filter(Objects::nonNull) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) .setParallelism(FlowWriteConfig.SINK_PARALLELISM) .name("CKSink"); //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()) + dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); @@ -119,12 +121,11 @@ public class LogFlowWriteTopology { DataStream> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new DnsGraphProcessFunction()) - .setParallelism(SINK_PARALLELISM) - .filter(Objects::nonNull); + .setParallelism(SINK_PARALLELISM); for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow()) + .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java deleted file mode 100644 index 1425ce9..0000000 --- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.zdjizhi.utils.app; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.http.HttpClientUtil; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * AppId 工具类 - * - * @author qidaijie - */ - -@Deprecated -public class AppUtils { - private static final Log logger = LogFactory.get(); - private static Map appIdMap = new ConcurrentHashMap<>(128); - private static AppUtils appUtils; - - private static void getAppInstance() { - appUtils = new AppUtils(); - } - - - /** - * 构造函数-新 - */ - private AppUtils() { - //定时更新 - updateAppIdCache(); - } - - /** - * 更新变量 - */ - private static void change() { - if (appUtils == null) { - getAppInstance(); - } - timestampsFilter(); - } - - - /** - * 获取变更内容 - */ - private static void timestampsFilter() { - try { - Long begin = System.currentTimeMillis(); - String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP); - if (StringUtil.isNotBlank(schema)) { - String data = JSONObject.parseObject(schema).getString("data"); - JSONArray objects = JSONArray.parseArray(data); - for (Object object : objects) { - JSONArray jsonArray = JSONArray.parseArray(object.toString()); - int key = jsonArray.getInteger(0); - String value = jsonArray.getString(1); - if (appIdMap.containsKey(key)) { - if (!value.equals(appIdMap.get(key))) { - appIdMap.put(key, value); - } - } else { - appIdMap.put(key, value); - } - } - logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis())); - logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); - } - } catch (RuntimeException e) { - logger.error("Update cache app-id failed, exception:" + e); - } - } - - - /** - * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie - */ - private void updateAppIdCache() { - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); - executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { - change(); - } - } catch (RuntimeException e) { - logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); - } - } - }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); - } - - - /** - * 获取 appName - * - * @param appId app_id - * @return account - */ - public static String getAppName(int appId) { - - if (appUtils == null) { - getAppInstance(); - } - - if (appIdMap.containsKey(appId)) { - return appIdMap.get(appId); - } else { - logger.warn("AppMap get appName is null, ID is :" + appId); - return ""; - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java index d2306b7..d94dad7 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java @@ -17,9 +17,8 @@ public class ArangoDBSink extends RichSinkFunction> { private String collection; @Override - public void invoke(List BaseEdgeDocuments, Context context) throws Exception { - - arangoDBConnect.overwrite(BaseEdgeDocuments, getCollection()); + public void invoke(List baseEdgeDocuments, Context context) throws Exception { + arangoDBConnect.overwrite(baseEdgeDocuments, getCollection()); } @Override @@ -47,3 +46,4 @@ public class ArangoDBSink extends RichSinkFunction> { } } + diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index cb37500..d3e14cc 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -29,7 +29,6 @@ public class ClickhouseSink extends RichSinkFunction>> private PreparedStatement preparedStatement; public String sink; - public ClickhouseSink(String sink) { this.sink = sink; } @@ -49,7 +48,7 @@ public class ClickhouseSink extends RichSinkFunction>> @Override public void open(Configuration parameters) throws Exception { - + super.open(parameters); try { // Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); // connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); @@ -64,8 +63,7 @@ public class ClickhouseSink extends RichSinkFunction>> BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测 connection = dataSource.getConnection(); - - log.info("get clickhouse connection success"); + log.debug("get clickhouse connection success"); } catch (SQLException e) { log.error("clickhouse connection error ,{}", e); } @@ -102,7 +100,7 @@ public class ClickhouseSink extends RichSinkFunction>> preparedStatement.addBatch(); count++; //1w提交一次 - if (count % SINK_BATCH == 0) { + if (count % CK_BATCH == 0) { preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java index 27daa71..42b7639 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -113,11 +113,6 @@ public class TransFormMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); } break; - case "app_match": - if (logValue != null && appendTo == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); - } - break; default: } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java index 34cabfa..9e92576 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -115,11 +115,6 @@ public class TransFormTypeMap { JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); } break; - case "app_match": - if (logValue != null && appendToKeyValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); - } - break; default: } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index e3363f9..84fe5cc 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -1,7 +1,6 @@ package com.zdjizhi.utils.general; import cn.hutool.core.codec.Base64; -import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.jayway.jsonpath.InvalidPathException; @@ -10,7 +9,6 @@ import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.FormatUtils; import com.zdjizhi.utils.IpLookupV2; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.app.AppUtils; import com.zdjizhi.utils.json.JsonParseUtil; import java.math.BigInteger; @@ -126,22 +124,6 @@ class TransFunction { * @return account */ - /** - * appId与缓存中对应关系补全appName - * - * @param appIds app id 列表 - * @return appName - */ - @Deprecated - static String appMatch(String appIds) { - try { - String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); - return AppUtils.getAppName(Integer.parseInt(appId)); - } catch (NumberFormatException | ClassCastException exception) { - logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds); - return ""; - } - } /** * 解析顶级域名 diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 97a53da..e15d535 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -14,10 +14,10 @@ import java.util.Properties; * @date 2021/6/813:54 */ public class KafkaConsumer { - private static Properties createConsumerConfig() { + private static Properties createConsumerConfig(String topic) { Properties properties = new Properties(); properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); - properties.put("group.id", FlowWriteConfig.GROUP_ID); + properties.put("group.id", FlowWriteConfig.GROUP_ID + "-" + topic); properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); @@ -33,9 +33,9 @@ public class KafkaConsumer { * @return kafka logs -> map */ @SuppressWarnings("unchecked") - public static FlinkKafkaConsumer> myDeserializationConsumer(String topic) { - FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(topic, - new TimestampDeserializationSchema(), createConsumerConfig()); + public static FlinkKafkaConsumer> myDeserializationConsumer(String topic) { + FlinkKafkaConsumer> kafkaConsumer = new FlinkKafkaConsumer<>(topic, + new TimestampDeserializationSchema(), createConsumerConfig(topic)); //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); @@ -51,9 +51,9 @@ public class KafkaConsumer { * * @return kafka logs */ - public static FlinkKafkaConsumer flinkConsumer() { - FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, - new SimpleStringSchema(), createConsumerConfig()); + public static FlinkKafkaConsumer flinkConsumer(String topic) { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, + new SimpleStringSchema(), createConsumerConfig(topic)); //随着checkpoint提交,将offset提交到kafka kafkaConsumer.setCommitOffsetsOnCheckpoints(true); diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index 28ecff9..cf27cc3 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -33,9 +33,9 @@ public class KafkaProducer { } - public static FlinkKafkaProducer getKafkaProducer() { + public static FlinkKafkaProducer getKafkaProducer(String topic) { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( - FlowWriteConfig.SINK_KAFKA_TOPIC, + topic, new SimpleStringSchema(), createProducerConfig(), //sink与所有分区建立连接,轮询写入;