优化代码:去除无使用的类
This commit is contained in:
10
pom.xml
10
pom.xml
@@ -36,7 +36,7 @@
|
|||||||
<flink.version>1.13.1</flink.version>
|
<flink.version>1.13.1</flink.version>
|
||||||
<kafka.version>1.0.0</kafka.version>
|
<kafka.version>1.0.0</kafka.version>
|
||||||
<zdjz.tools.version>1.0.8</zdjz.tools.version>
|
<zdjz.tools.version>1.0.8</zdjz.tools.version>
|
||||||
<!-- <scope.type>provided</scope.type>-->
|
<scope.type>provided</scope.type>
|
||||||
<!-- <scope.type>compile</scope.type>-->
|
<!-- <scope.type>compile</scope.type>-->
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
@@ -65,7 +65,7 @@
|
|||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
<plugin>
|
<!-- <plugin>
|
||||||
<groupId>io.github.zlika</groupId>
|
<groupId>io.github.zlika</groupId>
|
||||||
<artifactId>reproducible-build-maven-plugin</artifactId>
|
<artifactId>reproducible-build-maven-plugin</artifactId>
|
||||||
<version>0.2</version>
|
<version>0.2</version>
|
||||||
@@ -77,7 +77,7 @@
|
|||||||
<phase>package</phase>
|
<phase>package</phase>
|
||||||
</execution>
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>-->
|
||||||
|
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
@@ -243,11 +243,11 @@
|
|||||||
<artifactId>clickhouse-jdbc</artifactId>
|
<artifactId>clickhouse-jdbc</artifactId>
|
||||||
<version>0.2.6</version>
|
<version>0.2.6</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>
|
||||||
<groupId>org.apache.flink</groupId>
|
<groupId>org.apache.flink</groupId>
|
||||||
<artifactId>flink-table-planner-blink_2.12</artifactId>
|
<artifactId>flink-table-planner-blink_2.12</artifactId>
|
||||||
<version>${flink.version}</version>
|
<version>${flink.version}</version>
|
||||||
</dependency>
|
</dependency>-->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.arangodb</groupId>
|
<groupId>com.arangodb</groupId>
|
||||||
<artifactId>arangodb-java-driver</artifactId>
|
<artifactId>arangodb-java-driver</artifactId>
|
||||||
|
|||||||
@@ -8,58 +8,40 @@ max.poll.records=5000
|
|||||||
#kafka source poll bytes
|
#kafka source poll bytes
|
||||||
max.partition.fetch.bytes=31457280
|
max.partition.fetch.bytes=31457280
|
||||||
#====================Kafka KafkaProducer====================#
|
#====================Kafka KafkaProducer====================#
|
||||||
#producer重试的次数设置
|
#producer\u91CD\u8BD5\u7684\u6B21\u6570\u8BBE\u7F6E
|
||||||
retries=0
|
retries=0
|
||||||
|
|
||||||
#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
|
#\u4ED6\u7684\u542B\u4E49\u5C31\u662F\u8BF4\u4E00\u4E2ABatch\u88AB\u521B\u5EFA\u4E4B\u540E\uFF0C\u6700\u591A\u8FC7\u591A\u4E45\uFF0C\u4E0D\u7BA1\u8FD9\u4E2ABatch\u6709\u6CA1\u6709\u5199\u6EE1\uFF0C\u90FD\u5FC5\u987B\u53D1\u9001\u51FA\u53BB\u4E86
|
||||||
linger.ms=10
|
linger.ms=10
|
||||||
|
|
||||||
#如果在超时之前未收到响应,客户端将在必要时重新发送请求
|
#\u5982\u679C\u5728\u8D85\u65F6\u4E4B\u524D\u672A\u6536\u5230\u54CD\u5E94\uFF0C\u5BA2\u6237\u7AEF\u5C06\u5728\u5FC5\u8981\u65F6\u91CD\u65B0\u53D1\u9001\u8BF7\u6C42
|
||||||
request.timeout.ms=30000
|
request.timeout.ms=30000
|
||||||
|
|
||||||
#producer都是按照batch进行发送的,批次大小,默认:16384
|
#producer\u90FD\u662F\u6309\u7167batch\u8FDB\u884C\u53D1\u9001\u7684,\u6279\u6B21\u5927\u5C0F\uFF0C\u9ED8\u8BA4:16384
|
||||||
batch.size=262144
|
batch.size=262144
|
||||||
|
|
||||||
#Producer端用于缓存消息的缓冲区大小
|
#Producer\u7AEF\u7528\u4E8E\u7F13\u5B58\u6D88\u606F\u7684\u7F13\u51B2\u533A\u5927\u5C0F
|
||||||
#128M
|
#128M
|
||||||
buffer.memory=134217728
|
buffer.memory=134217728
|
||||||
|
|
||||||
#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
|
#\u8FD9\u4E2A\u53C2\u6570\u51B3\u5B9A\u4E86\u6BCF\u6B21\u53D1\u9001\u7ED9Kafka\u670D\u52A1\u5668\u8BF7\u6C42\u7684\u6700\u5927\u5927\u5C0F,\u9ED8\u8BA41048576
|
||||||
#10M
|
#10M
|
||||||
max.request.size=10485760
|
max.request.size=10485760
|
||||||
#====================kafka default====================#
|
#====================kafka default====================#
|
||||||
#kafka SASL验证用户名-加密
|
#kafka SASL\u9A8C\u8BC1\u7528\u6237\u540D-\u52A0\u5BC6
|
||||||
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
|
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
|
||||||
|
|
||||||
#kafka SASL及SSL验证密码-加密
|
#kafka SASL\u53CASSL\u9A8C\u8BC1\u5BC6\u7801-\u52A0\u5BC6
|
||||||
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
||||||
|
|
||||||
#生产者ack
|
#\u751F\u4EA7\u8005ack
|
||||||
producer.ack=1
|
producer.ack=1
|
||||||
#====================nacos default====================#
|
|
||||||
#nacos username
|
|
||||||
nacos.username=nacos
|
|
||||||
|
|
||||||
#nacos password
|
#\u90AE\u4EF6\u9ED8\u8BA4\u7F16\u7801
|
||||||
nacos.pin=nacos
|
|
||||||
|
|
||||||
#nacos group
|
|
||||||
nacos.group=Galaxy
|
|
||||||
#====================Topology Default====================#
|
|
||||||
#hbase table name
|
|
||||||
hbase.table.name=tsg_galaxy:relation_framedip_account
|
|
||||||
|
|
||||||
#邮件默认编码
|
|
||||||
mail.default.charset=UTF-8
|
mail.default.charset=UTF-8
|
||||||
|
|
||||||
#0不做任何校验,1弱类型校验
|
#0\u4E0D\u505A\u4EFB\u4F55\u6821\u9A8C\uFF0C1\u5F31\u7C7B\u578B\u6821\u9A8C
|
||||||
log.transform.type=1
|
log.transform.type=1
|
||||||
|
|
||||||
#两个输出之间的最大时间(单位milliseconds)
|
#\u4E24\u4E2A\u8F93\u51FA\u4E4B\u95F4\u7684\u6700\u5927\u65F6\u95F4(\u5355\u4F4Dmilliseconds)
|
||||||
buffer.timeout=5000
|
buffer.timeout=5000
|
||||||
#====================临时配置-待删除====================#
|
|
||||||
#网关APP_ID 获取接口
|
|
||||||
app.id.http=http://192.168.44.20:9999/open-api/appDicList
|
|
||||||
|
|
||||||
#app_id 更新时间,如填写0则不更新缓存
|
|
||||||
app.tick.tuple.freq.secs=0
|
|
||||||
@@ -8,51 +8,25 @@ sink.kafka.servers=192.168.45.102:9092
|
|||||||
#zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id
|
#zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id
|
||||||
zookeeper.servers=192.168.45.102:2181
|
zookeeper.servers=192.168.45.102:2181
|
||||||
|
|
||||||
#hbase zookeeper\u5730\u5740 \u7528\u4E8E\u8FDE\u63A5HBase
|
|
||||||
hbase.zookeeper.servers=192.168.45.102:2181
|
|
||||||
|
|
||||||
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------#
|
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------#
|
||||||
#\u5B9A\u4F4D\u5E93\u5730\u5740
|
#\u5B9A\u4F4D\u5E93\u5730\u5740
|
||||||
tools.library=D:\\workerspace\\dat\\
|
tools.library=D:\\workerspace\\dat\\
|
||||||
|
|
||||||
#--------------------------------nacos\u914D\u7F6E------------------------------#
|
|
||||||
#nacos \u5730\u5740
|
|
||||||
nacos.server=192.168.45.102:8848
|
|
||||||
|
|
||||||
#nacos namespace
|
|
||||||
nacos.schema.namespace=prod
|
|
||||||
|
|
||||||
#nacos data id
|
|
||||||
nacos.data.id=session_record.json
|
|
||||||
|
|
||||||
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
|
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
|
||||||
|
|
||||||
#kafka \u63A5\u6536\u6570\u636Etopic
|
|
||||||
source.kafka.topic=atest
|
|
||||||
|
|
||||||
#\u8865\u5168\u6570\u636E \u8F93\u51FA topic
|
|
||||||
sink.kafka.topic=atest2
|
|
||||||
|
|
||||||
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
|
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
|
||||||
group.id=flinktest-102
|
group.id=knowledge-group
|
||||||
|
|
||||||
#--------------------------------topology\u914D\u7F6E------------------------------#
|
#--------------------------------topology\u914D\u7F6E------------------------------#
|
||||||
|
|
||||||
#consumer \u5E76\u884C\u5EA6
|
#consumer \u5E76\u884C\u5EA6
|
||||||
source.parallelism=1
|
source.parallelism=1
|
||||||
|
|
||||||
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
|
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
|
||||||
transform.parallelism=1
|
transform.parallelism=1
|
||||||
|
|
||||||
#kafka producer \u5E76\u884C\u5EA6
|
#kafka producer \u5E76\u884C\u5EA6
|
||||||
sink.parallelism=1
|
sink.parallelism=1
|
||||||
|
|
||||||
#\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31)
|
#\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31)
|
||||||
data.center.id.num=0
|
data.center.id.num=0
|
||||||
|
|
||||||
#hbase \u66F4\u65B0\u65F6\u95F4\uFF0C\u5982\u586B\u51990\u5219\u4E0D\u66F4\u65B0\u7F13\u5B58
|
|
||||||
hbase.tick.tuple.freq.secs=180
|
|
||||||
|
|
||||||
#--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------#
|
#--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------#
|
||||||
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
|
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
|
||||||
log.need.complete=2
|
log.need.complete=2
|
||||||
@@ -60,7 +34,6 @@ log.need.complete=2
|
|||||||
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
|
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
|
||||||
producer.kafka.compression.type=none
|
producer.kafka.compression.type=none
|
||||||
|
|
||||||
|
|
||||||
source.kafka.topic.connection=connection_record_log
|
source.kafka.topic.connection=connection_record_log
|
||||||
source.kafka.topic.sketch=connection_sketch_record_log
|
source.kafka.topic.sketch=connection_sketch_record_log
|
||||||
source.kafka.topic.dns=dns_record_log
|
source.kafka.topic.dns=dns_record_log
|
||||||
@@ -78,36 +51,30 @@ sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP
|
|||||||
sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
|
sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
|
||||||
|
|
||||||
#clickhouse \u5165\u5E93
|
#clickhouse \u5165\u5E93
|
||||||
ck.hosts=192.168.45.102:8123
|
ck.hosts=192.168.45.102:8123,192.168.45.102:8123
|
||||||
ck.database=tsg_galaxy_v3
|
ck.database=tsg_galaxy_v3
|
||||||
ck.username=default
|
ck.username=default
|
||||||
ck.pin=galaxy2019
|
ck.pin=galaxy2019
|
||||||
#\u5355\u4F4D\u6BEB\u79D2
|
#\u5355\u4F4D\u6BEB\u79D2
|
||||||
ck.connection.timeout=10000
|
ck.connection.timeout=10000
|
||||||
ck.socket.timeout=300000
|
ck.socket.timeout=300000
|
||||||
|
ck.batch=10000
|
||||||
|
|
||||||
#connection_record_log
|
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
|
||||||
|
flink.watermark.max.delay.time=50
|
||||||
flink.watermark.max.orderness=50
|
#ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
|
||||||
#\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
|
|
||||||
log.aggregate.duration=5
|
log.aggregate.duration=5
|
||||||
|
#arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
|
||||||
log.aggregate.duration.graph=5
|
log.aggregate.duration.graph=5
|
||||||
|
|
||||||
#arangoDB\u53C2\u6570\u914D\u7F6E
|
#arangoDB\u53C2\u6570\u914D\u7F6E
|
||||||
arangoDB.host=192.168.45.102
|
arangoDB.host=192.168.45.102
|
||||||
#arangoDB.host=192.168.40.224
|
|
||||||
arangoDB.port=8529
|
arangoDB.port=8529
|
||||||
arangoDB.user=root
|
arangoDB.user=root
|
||||||
arangoDB.password=galaxy_2019
|
arangoDB.password=galaxy_2019
|
||||||
arangoDB.DB.name=knowledge
|
arangoDB.DB.name=knowledge
|
||||||
arangoDB.batch=100000
|
arangoDB.batch=100000
|
||||||
arangoDB.ttl=3600
|
arangoDB.ttl=3600
|
||||||
|
|
||||||
arangoDB.read.limit=
|
|
||||||
update.arango.batch=10000
|
|
||||||
|
|
||||||
thread.pool.number=10
|
thread.pool.number=10
|
||||||
thread.await.termination.time=10
|
|
||||||
|
|
||||||
sink.batch.time.out=5
|
sink.batch.delay.time=5
|
||||||
sink.batch=10000
|
|
||||||
|
|||||||
@@ -38,17 +38,6 @@ public class FlowWriteConfig {
|
|||||||
*/
|
*/
|
||||||
public static final String ENCODING = "UTF8";
|
public static final String ENCODING = "UTF8";
|
||||||
|
|
||||||
/**
|
|
||||||
* Nacos
|
|
||||||
*/
|
|
||||||
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
|
|
||||||
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
|
|
||||||
public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
|
|
||||||
public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
|
|
||||||
public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
|
|
||||||
public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
|
|
||||||
public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* System config
|
* System config
|
||||||
*/
|
*/
|
||||||
@@ -61,12 +50,6 @@ public class FlowWriteConfig {
|
|||||||
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
|
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
|
||||||
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
|
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
|
||||||
|
|
||||||
/**
|
|
||||||
* HBase
|
|
||||||
*/
|
|
||||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
|
||||||
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* kafka common
|
* kafka common
|
||||||
*/
|
*/
|
||||||
@@ -76,7 +59,6 @@ public class FlowWriteConfig {
|
|||||||
/**
|
/**
|
||||||
* kafka source config
|
* kafka source config
|
||||||
*/
|
*/
|
||||||
public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
|
|
||||||
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
|
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
|
||||||
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
|
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
|
||||||
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
|
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
|
||||||
@@ -85,7 +67,6 @@ public class FlowWriteConfig {
|
|||||||
/**
|
/**
|
||||||
* kafka sink config
|
* kafka sink config
|
||||||
*/
|
*/
|
||||||
public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
|
|
||||||
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
|
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
|
||||||
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
|
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
|
||||||
|
|
||||||
@@ -99,12 +80,6 @@ public class FlowWriteConfig {
|
|||||||
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
|
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
|
||||||
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
|
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
|
||||||
|
|
||||||
/**
|
|
||||||
* http
|
|
||||||
*/
|
|
||||||
public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
|
|
||||||
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* common config
|
* common config
|
||||||
*/
|
*/
|
||||||
@@ -112,7 +87,6 @@ public class FlowWriteConfig {
|
|||||||
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
|
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
|
||||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
|
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
|
||||||
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
|
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
|
||||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -125,7 +99,7 @@ public class FlowWriteConfig {
|
|||||||
public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
|
public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
|
||||||
public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
|
public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
|
||||||
|
|
||||||
public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
|
public static final int FLINK_WATERMARK_MAX_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.delay.time");
|
||||||
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
|
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
|
||||||
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
|
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
|
||||||
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
|
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
|
||||||
@@ -147,8 +121,6 @@ public class FlowWriteConfig {
|
|||||||
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
|
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
|
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
|
||||||
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
|
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
|
||||||
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
|
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
|
||||||
@@ -156,11 +128,8 @@ public class FlowWriteConfig {
|
|||||||
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
|
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
|
||||||
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
|
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
|
||||||
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
|
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
|
||||||
|
|
||||||
public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
|
|
||||||
public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
|
|
||||||
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
|
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
|
||||||
public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
|
|
||||||
public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
|
public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time");
|
||||||
public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
|
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.etl;
|
||||||
|
|
||||||
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
|
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
|
||||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||||
@@ -9,7 +9,7 @@ import java.util.Iterator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
|
public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
|
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.etl.connection;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.arangodb.entity.BaseEdgeDocument;
|
import com.arangodb.entity.BaseEdgeDocument;
|
||||||
@@ -11,7 +11,7 @@ import java.util.Iterator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class ArangodbIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
|
public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
|
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.connection;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
@@ -24,21 +24,8 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
|
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
|
||||||
Map<String, Object> middleResult = getMiddleResult(keys, elements);
|
|
||||||
try {
|
|
||||||
if (middleResult != null) {
|
|
||||||
out.collect(middleResult);
|
|
||||||
logger.debug("获取中间聚合结果:{}", middleResult.toString());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) {
|
|
||||||
|
|
||||||
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
|
|
||||||
try {
|
try {
|
||||||
|
Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
|
||||||
if (values != null) {
|
if (values != null) {
|
||||||
Map<String, Object> result = new LinkedHashMap<>();
|
Map<String, Object> result = new LinkedHashMap<>();
|
||||||
result.put("start_time", values.f0);
|
result.put("start_time", values.f0);
|
||||||
@@ -48,13 +35,12 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
|
|||||||
result.put("sessions", values.f2);
|
result.put("sessions", values.f2);
|
||||||
result.put("packets", values.f3);
|
result.put("packets", values.f3);
|
||||||
result.put("bytes", values.f4);
|
result.put("bytes", values.f4);
|
||||||
return result;
|
out.collect(result);
|
||||||
|
logger.debug("获取中间聚合结果:{}", result.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
|
logger.error("获取中间聚合结果失败,middleResult: {}", e);
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
|
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.connection;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.etl.connection;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.flink.api.java.functions.KeySelector;
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.connection;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.arangodb.entity.BaseEdgeDocument;
|
import com.arangodb.entity.BaseEdgeDocument;
|
||||||
@@ -11,7 +11,7 @@ import java.util.Iterator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class ArangodbDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
|
public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
|
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.common;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import org.apache.flink.api.java.functions.KeySelector;
|
import org.apache.flink.api.java.functions.KeySelector;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.json.JSONArray;
|
import cn.hutool.json.JSONArray;
|
||||||
@@ -47,7 +47,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
|
|||||||
} else if (DnsType.CNAME.getCode().equals(type)) {
|
} else if (DnsType.CNAME.getCode().equals(type)) {
|
||||||
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
|
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
|
||||||
dnsCNAMENum++;
|
dnsCNAMENum++;
|
||||||
} else if (DnsType.CNAME.getCode().equals(type)) {
|
} else if (DnsType.NS.getCode().equals(type)) {
|
||||||
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
|
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
|
||||||
dnsNsNum++;
|
dnsNsNum++;
|
||||||
} else if (DnsType.MX.getCode().equals(type)) {
|
} else if (DnsType.MX.getCode().equals(type)) {
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.zdjizhi.etl;
|
package com.zdjizhi.etl.dns;
|
||||||
|
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
@@ -4,9 +4,11 @@ import cn.hutool.core.convert.Convert;
|
|||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.zdjizhi.common.*;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.enums.DnsType;
|
import com.zdjizhi.enums.DnsType;
|
||||||
import com.zdjizhi.etl.*;
|
import com.zdjizhi.etl.CKBatchWindow;
|
||||||
|
import com.zdjizhi.etl.connection.*;
|
||||||
|
import com.zdjizhi.etl.dns.*;
|
||||||
import com.zdjizhi.utils.arangodb.ArangoDBSink;
|
import com.zdjizhi.utils.arangodb.ArangoDBSink;
|
||||||
import com.zdjizhi.utils.ck.ClickhouseSink;
|
import com.zdjizhi.utils.ck.ClickhouseSink;
|
||||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||||
@@ -49,7 +51,7 @@ public class LogFlowWriteTopology {
|
|||||||
//transform
|
//transform
|
||||||
DataStream<Map<String, Object>> connTransformStream = connSource
|
DataStream<Map<String, Object>> connTransformStream = connSource
|
||||||
.assignTimestampsAndWatermarks(WatermarkStrategy
|
.assignTimestampsAndWatermarks(WatermarkStrategy
|
||||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
|
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
|
||||||
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
|
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
|
||||||
.keyBy(new IpKeysSelector())
|
.keyBy(new IpKeysSelector())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
||||||
@@ -58,7 +60,7 @@ public class LogFlowWriteTopology {
|
|||||||
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
|
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
|
||||||
|
|
||||||
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
|
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
|
||||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
|
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
|
||||||
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
|
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
|
||||||
.keyBy(new IpKeysSelector())
|
.keyBy(new IpKeysSelector())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
|
||||||
@@ -72,16 +74,16 @@ public class LogFlowWriteTopology {
|
|||||||
.keyBy(new IpKeysSelector())
|
.keyBy(new IpKeysSelector())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||||
.process(new Ip2IpGraphProcessFunction())
|
.process(new Ip2IpGraphProcessFunction())
|
||||||
// .filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.setParallelism(TRANSFORM_PARALLELISM);
|
.setParallelism(TRANSFORM_PARALLELISM);
|
||||||
|
|
||||||
//写入CKsink,批量处理
|
//写入CKsink,批量处理
|
||||||
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
|
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
|
||||||
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
|
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
|
||||||
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
|
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
|
||||||
|
|
||||||
//写入arangodb
|
//写入arangodb
|
||||||
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
|
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
|
||||||
|
|
||||||
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
|
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
|
||||||
|
|
||||||
@@ -92,7 +94,7 @@ public class LogFlowWriteTopology {
|
|||||||
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
|
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
|
||||||
|
|
||||||
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
|
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
|
||||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS))
|
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
|
||||||
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
|
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
|
||||||
.flatMap(new DnsSplitFlatMapFunction())
|
.flatMap(new DnsSplitFlatMapFunction())
|
||||||
.keyBy(new DnsGraphKeysSelector())
|
.keyBy(new DnsGraphKeysSelector())
|
||||||
@@ -104,13 +106,13 @@ public class LogFlowWriteTopology {
|
|||||||
//dns 原始日志 ck入库
|
//dns 原始日志 ck入库
|
||||||
dnsSource.filter(Objects::nonNull)
|
dnsSource.filter(Objects::nonNull)
|
||||||
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
||||||
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
|
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
|
||||||
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
|
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
|
||||||
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
|
||||||
.name("CKSink");
|
.name("CKSink");
|
||||||
|
|
||||||
//dns 拆分后relation日志 ck入库
|
//dns 拆分后relation日志 ck入库
|
||||||
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
|
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
|
||||||
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
|
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
|
||||||
.setParallelism(SINK_PARALLELISM)
|
.setParallelism(SINK_PARALLELISM)
|
||||||
.name("CKSink");
|
.name("CKSink");
|
||||||
@@ -119,12 +121,11 @@ public class LogFlowWriteTopology {
|
|||||||
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
|
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
|
||||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||||
.process(new DnsGraphProcessFunction())
|
.process(new DnsGraphProcessFunction())
|
||||||
.setParallelism(SINK_PARALLELISM)
|
.setParallelism(SINK_PARALLELISM);
|
||||||
.filter(Objects::nonNull);
|
|
||||||
|
|
||||||
for (DnsType dnsEnum : DnsType.values()) {
|
for (DnsType dnsEnum : DnsType.values()) {
|
||||||
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
|
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
|
||||||
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow())
|
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow())
|
||||||
.addSink(new ArangoDBSink(dnsEnum.getSink()))
|
.addSink(new ArangoDBSink(dnsEnum.getSink()))
|
||||||
.setParallelism(SINK_PARALLELISM)
|
.setParallelism(SINK_PARALLELISM)
|
||||||
.name("ArangodbSink");
|
.name("ArangodbSink");
|
||||||
|
|||||||
@@ -1,124 +0,0 @@
|
|||||||
package com.zdjizhi.utils.app;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.alibaba.fastjson.JSONArray;
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.utils.StringUtil;
|
|
||||||
import com.zdjizhi.utils.http.HttpClientUtil;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* AppId 工具类
|
|
||||||
*
|
|
||||||
* @author qidaijie
|
|
||||||
*/
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public class AppUtils {
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
|
|
||||||
private static AppUtils appUtils;
|
|
||||||
|
|
||||||
private static void getAppInstance() {
|
|
||||||
appUtils = new AppUtils();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 构造函数-新
|
|
||||||
*/
|
|
||||||
private AppUtils() {
|
|
||||||
//定时更新
|
|
||||||
updateAppIdCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 更新变量
|
|
||||||
*/
|
|
||||||
private static void change() {
|
|
||||||
if (appUtils == null) {
|
|
||||||
getAppInstance();
|
|
||||||
}
|
|
||||||
timestampsFilter();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取变更内容
|
|
||||||
*/
|
|
||||||
private static void timestampsFilter() {
|
|
||||||
try {
|
|
||||||
Long begin = System.currentTimeMillis();
|
|
||||||
String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
|
|
||||||
if (StringUtil.isNotBlank(schema)) {
|
|
||||||
String data = JSONObject.parseObject(schema).getString("data");
|
|
||||||
JSONArray objects = JSONArray.parseArray(data);
|
|
||||||
for (Object object : objects) {
|
|
||||||
JSONArray jsonArray = JSONArray.parseArray(object.toString());
|
|
||||||
int key = jsonArray.getInteger(0);
|
|
||||||
String value = jsonArray.getString(1);
|
|
||||||
if (appIdMap.containsKey(key)) {
|
|
||||||
if (!value.equals(appIdMap.get(key))) {
|
|
||||||
appIdMap.put(key, value);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
appIdMap.put(key, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
|
|
||||||
logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
|
|
||||||
}
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
logger.error("Update cache app-id failed, exception:" + e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
|
|
||||||
*/
|
|
||||||
private void updateAppIdCache() {
|
|
||||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
|
|
||||||
executorService.scheduleAtFixedRate(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
|
|
||||||
change();
|
|
||||||
}
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取 appName
|
|
||||||
*
|
|
||||||
* @param appId app_id
|
|
||||||
* @return account
|
|
||||||
*/
|
|
||||||
public static String getAppName(int appId) {
|
|
||||||
|
|
||||||
if (appUtils == null) {
|
|
||||||
getAppInstance();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (appIdMap.containsKey(appId)) {
|
|
||||||
return appIdMap.get(appId);
|
|
||||||
} else {
|
|
||||||
logger.warn("AppMap get appName is null, ID is :" + appId);
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -17,9 +17,8 @@ public class ArangoDBSink extends RichSinkFunction<List<BaseEdgeDocument>> {
|
|||||||
private String collection;
|
private String collection;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void invoke(List<BaseEdgeDocument> BaseEdgeDocuments, Context context) throws Exception {
|
public void invoke(List<BaseEdgeDocument> baseEdgeDocuments, Context context) throws Exception {
|
||||||
|
arangoDBConnect.overwrite(baseEdgeDocuments, getCollection());
|
||||||
arangoDBConnect.overwrite(BaseEdgeDocuments, getCollection());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -47,3 +46,4 @@ public class ArangoDBSink extends RichSinkFunction<List<BaseEdgeDocument>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
|||||||
private PreparedStatement preparedStatement;
|
private PreparedStatement preparedStatement;
|
||||||
public String sink;
|
public String sink;
|
||||||
|
|
||||||
|
|
||||||
public ClickhouseSink(String sink) {
|
public ClickhouseSink(String sink) {
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
}
|
}
|
||||||
@@ -49,7 +48,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
try {
|
try {
|
||||||
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
|
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
|
||||||
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
|
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
|
||||||
@@ -64,8 +63,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
|||||||
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
|
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
|
||||||
dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
|
dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
|
||||||
connection = dataSource.getConnection();
|
connection = dataSource.getConnection();
|
||||||
|
log.debug("get clickhouse connection success");
|
||||||
log.info("get clickhouse connection success");
|
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
log.error("clickhouse connection error ,{}", e);
|
log.error("clickhouse connection error ,{}", e);
|
||||||
}
|
}
|
||||||
@@ -102,7 +100,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
|||||||
preparedStatement.addBatch();
|
preparedStatement.addBatch();
|
||||||
count++;
|
count++;
|
||||||
//1w提交一次
|
//1w提交一次
|
||||||
if (count % SINK_BATCH == 0) {
|
if (count % CK_BATCH == 0) {
|
||||||
preparedStatement.executeBatch();
|
preparedStatement.executeBatch();
|
||||||
connection.commit();
|
connection.commit();
|
||||||
preparedStatement.clearBatch();
|
preparedStatement.clearBatch();
|
||||||
|
|||||||
@@ -113,11 +113,6 @@ public class TransFormMap {
|
|||||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
|
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "app_match":
|
|
||||||
if (logValue != null && appendTo == null) {
|
|
||||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -115,11 +115,6 @@ public class TransFormTypeMap {
|
|||||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
|
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "app_match":
|
|
||||||
if (logValue != null && appendToKeyValue == null) {
|
|
||||||
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.zdjizhi.utils.general;
|
package com.zdjizhi.utils.general;
|
||||||
|
|
||||||
import cn.hutool.core.codec.Base64;
|
import cn.hutool.core.codec.Base64;
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.jayway.jsonpath.InvalidPathException;
|
import com.jayway.jsonpath.InvalidPathException;
|
||||||
@@ -10,7 +9,6 @@ import com.zdjizhi.common.FlowWriteConfig;
|
|||||||
import com.zdjizhi.utils.FormatUtils;
|
import com.zdjizhi.utils.FormatUtils;
|
||||||
import com.zdjizhi.utils.IpLookupV2;
|
import com.zdjizhi.utils.IpLookupV2;
|
||||||
import com.zdjizhi.utils.StringUtil;
|
import com.zdjizhi.utils.StringUtil;
|
||||||
import com.zdjizhi.utils.app.AppUtils;
|
|
||||||
import com.zdjizhi.utils.json.JsonParseUtil;
|
import com.zdjizhi.utils.json.JsonParseUtil;
|
||||||
|
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
@@ -126,22 +124,6 @@ class TransFunction {
|
|||||||
* @return account
|
* @return account
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
|
||||||
* appId与缓存中对应关系补全appName
|
|
||||||
*
|
|
||||||
* @param appIds app id 列表
|
|
||||||
* @return appName
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
static String appMatch(String appIds) {
|
|
||||||
try {
|
|
||||||
String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
|
|
||||||
return AppUtils.getAppName(Integer.parseInt(appId));
|
|
||||||
} catch (NumberFormatException | ClassCastException exception) {
|
|
||||||
logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 解析顶级域名
|
* 解析顶级域名
|
||||||
|
|||||||
@@ -14,10 +14,10 @@ import java.util.Properties;
|
|||||||
* @date 2021/6/813:54
|
* @date 2021/6/813:54
|
||||||
*/
|
*/
|
||||||
public class KafkaConsumer {
|
public class KafkaConsumer {
|
||||||
private static Properties createConsumerConfig() {
|
private static Properties createConsumerConfig(String topic) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
|
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
|
||||||
properties.put("group.id", FlowWriteConfig.GROUP_ID);
|
properties.put("group.id", FlowWriteConfig.GROUP_ID + "-" + topic);
|
||||||
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
|
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
|
||||||
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
|
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
|
||||||
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
|
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
|
||||||
@@ -33,9 +33,9 @@ public class KafkaConsumer {
|
|||||||
* @return kafka logs -> map
|
* @return kafka logs -> map
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public static FlinkKafkaConsumer<Map<String,Object>> myDeserializationConsumer(String topic) {
|
public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer(String topic) {
|
||||||
FlinkKafkaConsumer<Map<String,Object>> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
|
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
|
||||||
new TimestampDeserializationSchema(), createConsumerConfig());
|
new TimestampDeserializationSchema(), createConsumerConfig(topic));
|
||||||
|
|
||||||
//随着checkpoint提交,将offset提交到kafka
|
//随着checkpoint提交,将offset提交到kafka
|
||||||
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
||||||
@@ -51,9 +51,9 @@ public class KafkaConsumer {
|
|||||||
*
|
*
|
||||||
* @return kafka logs
|
* @return kafka logs
|
||||||
*/
|
*/
|
||||||
public static FlinkKafkaConsumer<String> flinkConsumer() {
|
public static FlinkKafkaConsumer<String> flinkConsumer(String topic) {
|
||||||
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
|
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
|
||||||
new SimpleStringSchema(), createConsumerConfig());
|
new SimpleStringSchema(), createConsumerConfig(topic));
|
||||||
|
|
||||||
//随着checkpoint提交,将offset提交到kafka
|
//随着checkpoint提交,将offset提交到kafka
|
||||||
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
|
||||||
|
|||||||
@@ -33,9 +33,9 @@ public class KafkaProducer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static FlinkKafkaProducer<String> getKafkaProducer() {
|
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
|
||||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
||||||
FlowWriteConfig.SINK_KAFKA_TOPIC,
|
topic,
|
||||||
new SimpleStringSchema(),
|
new SimpleStringSchema(),
|
||||||
createProducerConfig(),
|
createProducerConfig(),
|
||||||
//sink与所有分区建立连接,轮询写入;
|
//sink与所有分区建立连接,轮询写入;
|
||||||
|
|||||||
Reference in New Issue
Block a user