优化代码:增加入库批处理数量参数

This commit is contained in:
zhanghongqing
2022-07-18 18:08:47 +08:00
parent 95eefbd8b7
commit fa3f628658
8 changed files with 246 additions and 467 deletions

View File

@@ -1,64 +1,59 @@
#--------------------------------\u5730\u5740\u914D\u7F6E------------------------------#
#\u7BA1\u7406kafka\u5730\u5740
source.kafka.servers=192.168.45.102:9092
#\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094
source.kafka.servers=192.168.45.102:9094
#\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740
sink.kafka.servers=192.168.45.102:9092
#zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id
zookeeper.servers=192.168.45.102:2181
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------#
#\u5B9A\u4F4D\u5E93\u5730\u5740
tools.library=D:\\workerspace\\dat\\
sink.kafka.servers=
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------#
tools.library=
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
group.id=knowledge-group
group.id=KNOWLEDGE-GROUP
#--------------------------------topology\u914D\u7F6E------------------------------#
#consumer \u5E76\u884C\u5EA6
source.parallelism=1
source.parallelism=12
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
transform.parallelism=1
transform.parallelism=12
#kafka producer \u5E76\u884C\u5EA6
sink.parallelism=1
#\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31)
data.center.id.num=0
sink.parallelism=12
#--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------#
#--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------#
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
log.need.complete=2
log.type=1
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
producer.kafka.compression.type=none
source.kafka.topic.connection=connection_record_log
source.kafka.topic.sketch=connection_sketch_record_log
source.kafka.topic.dns=dns_record_log
#kafka\u6570\u636E\u6E90topic
source.kafka.topic.connection=CONNECTION-RECORD-LOG
source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG
source.kafka.topic.dns=DNS-RECORD-LOG
#\u5199\u5165clickhouse\u672C\u5730\u8868
sink.ck.table.connection=connection_record_log_local
sink.ck.table.sketch=connection_sketch_record_log_local
sink.ck.table.dns=dns_record_log_local
sink.ck.table.relation.connection=connection_relation_log_local
sink.ck.table.relation.dns=dns_relation_log_local
#\u5199arangodb\u8868
sink.arangodb.table.r.visit.ip2ip=R_VISIT_IP2IP
sink.arangodb.table.r.cname.domain2domain=R_CNAME_DOMAIN2DOMAIN
sink.arangodb.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN
sink.arangodb.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP
sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
sink.arango.table.r.visit.ip2ip=R_VISIT_IP2IP
sink.arango.table.r.cname.domain2domain=R_CNAME_DOMAIN2DOMAIN
sink.arango.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN
sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP
sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
#clickhouse \u5165\u5E93
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD7\uFF0C\u6216\u8005\u9009\u62E9\u5B89\u88C5gohangout\u5165\u5E93 0\uFF1A\u5426\uFF0C1\uFF1A\u662F
sink.ck.raw.log.insert.open=1
#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123
ck.hosts=192.168.45.102:8123,192.168.45.102:8123
ck.database=tsg_galaxy_v3
ck.username=default
ck.pin=galaxy2019
#\u5355\u4F4D\u6BEB\u79D2
#\u8D85\u65F6\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
ck.connection.timeout=10000
ck.socket.timeout=300000
ck.batch=10000
#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761
ck.batch=10
#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
sink.ck.batch.delay.time=1000
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
flink.watermark.max.delay.time=50
@@ -68,13 +63,13 @@ log.aggregate.duration=5
log.aggregate.duration.graph=5
#arangoDB\u53C2\u6570\u914D\u7F6E
arangoDB.host=192.168.45.102
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=galaxy_2019
arangoDB.DB.name=knowledge
arangoDB.batch=100000
arangoDB.ttl=3600
thread.pool.number=10
sink.batch.delay.time=5
arangodb.host=192.168.45.102
arangodb.port=8529
arangodb.user=root
arangodb.password=galaxy_2019
arangodb.db.name=knowledge
arangodb.batch=100000
arangodb.ttl=3600
arangodb.thread.pool.number=10
#\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms
sink.arangodb.batch.delay.time=5

View File

@@ -44,8 +44,8 @@ public class FlowWriteConfig {
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final Integer DATA_CENTER_ID_NUM = 0 ;
public static final Integer LOG_TYPE = FlowWriteConfigurations.getIntProperty(0, "log.type");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
@@ -85,7 +85,7 @@ public class FlowWriteConfig {
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String ZOOKEEPER_SERVERS = "zookeeper.servers";
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
@@ -114,22 +114,24 @@ public class FlowWriteConfig {
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip");
public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain");
public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain");
public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip");
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip");
public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain");
public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain");
public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.resolve.domain2ip");
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.nx.domain2domain");
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password");
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangodb.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangodb.port");
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangodb.user");
public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangodb.password");
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangodb.db.name");
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangodb.ttl");
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangodb.batch");
public static final Integer ARANGODB_THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "arangodb.thread.pool.number");
public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time");
public static final Integer SINK_CK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.ck.batch.delay.time");
public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time");
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
}

View File

@@ -0,0 +1,132 @@
package com.zdjizhi.etl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
*  * 带超时的计数窗口触发器
*/
public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
private static Log logger = LogFactory.get();
/**
* 窗口最大数据量
*/
private int maxCount;
/**
* event time / process time
*/
private TimeCharacteristic timeType;
private String stateName;
public String getStateName() {
return stateName;
}
public void setStateName(String stateName) {
this.stateName = stateName;
}
public CountTriggerWithTimeout(String stateName) {
this.stateName = stateName;
}
/**
* 用于储存窗口当前数据量的状态对象
*/
private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE);
public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) {
this.maxCount = maxCount;
this.timeType = timeType;
this.stateName = stateName;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
logger.info("fire with count: " + countState.get());
return fireAndPurge(window, ctx);
}
if (timestamp >= window.getEnd()) {
logger.info("fire with tiem: " + timestamp);
return fireAndPurge(window, ctx);
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.ProcessingTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.info("fire with process tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.EventTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
logger.info("fire with event tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.clear();
}
/**
* 计数方法
*/
class Sum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}

View File

@@ -7,12 +7,14 @@ import cn.hutool.log.LogFactory;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.enums.DnsType;
import com.zdjizhi.etl.CKBatchWindow;
import com.zdjizhi.etl.CountTriggerWithTimeout;
import com.zdjizhi.etl.connection.*;
import com.zdjizhi.etl.dns.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@@ -35,9 +37,10 @@ public class LogFlowWriteTopology {
env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
//1 connection2 dns
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
if (FlowWriteConfig.LOG_TYPE == 1) {
//connection
DataStream<Map<String, Object>> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION))
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(SOURCE_PARALLELISM)
.name(SOURCE_KAFKA_TOPIC_CONNECTION);
@@ -53,9 +56,11 @@ public class LogFlowWriteTopology {
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
.process(new ConnProcessFunction())
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x))
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
@@ -74,20 +79,43 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
}
sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
//写入arangodb
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchIPWindow())
.addSink(new ArangoDBSink(R_VISIT_IP2IP))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_TYPE == 2) {
DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS))
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
.filter(Objects::nonNull)
.map(new DnsMapFunction())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
@@ -96,23 +124,30 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.setParallelism(TRANSFORM_PARALLELISM)
.flatMap(new DnsSplitFlatMapFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM)
.filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//dns 原始日志 ck入库
dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
}
//dns 拆分后relation日志 ck入库
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
@@ -125,7 +160,10 @@ public class LogFlowWriteTopology {
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow())
.setParallelism(SINK_PARALLELISM)
.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
.trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime))
.apply(new ArangodbBatchDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");

View File

@@ -29,7 +29,7 @@ public class ArangoDBConnect {
private static void getArangoDB() {
arangoDB = new ArangoDB.Builder()
.maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER)
.maxConnections(FlowWriteConfig.ARANGODB_THREAD_POOL_NUMBER)
.host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT)
.user(FlowWriteConfig.ARANGODB_USER)
.password(FlowWriteConfig.ARANGODB_PASSWORD)

View File

@@ -1,335 +0,0 @@
/*
package com.zdjizhi.utils.ck;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDriver;
import ru.yandex.clickhouse.ClickhouseJdbcUrlParser;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
*/
/**
* 提供负载均衡能力的datasource实现
*//*
public class BalancedClickhouseDataSource implements DataSource {
private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");
private PrintWriter printWriter;
private int loginTimeoutSeconds;
//随机数
private final ThreadLocal<Random> randomThreadLocal;
//所有的url
private final List<String> allUrls;
//可用的url
private volatile List<String> enabledUrls;
private final ClickHouseProperties properties;
private final ClickHouseDriver driver;
public BalancedClickhouseDataSource(String url) {
this(splitUrl(url), getFromUrl(url));
}
public BalancedClickhouseDataSource(String url, Properties properties) {
this(splitUrl(url), new ClickHouseProperties(properties));
}
public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {
this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));
}
private BalancedClickhouseDataSource(List<String> urls) {
this(urls, new ClickHouseProperties());
}
private BalancedClickhouseDataSource(List<String> urls, Properties info) {
this(urls, new ClickHouseProperties(info));
}
private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {
this.loginTimeoutSeconds = 0;
this.randomThreadLocal = new ThreadLocal();
this.driver = new ClickHouseDriver();
if (urls.isEmpty()) {
throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
} else {
try {
//解析配置文件
ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());
localProperties.setHost((String)null);
localProperties.setPort(-1);
this.properties = localProperties;
} catch (URISyntaxException var8) {
throw new IllegalArgumentException(var8);
}
List<String> allUrls = new ArrayList(urls.size());
Iterator var4 = urls.iterator();
while(var4.hasNext()) {
String url = (String)var4.next();
try {
//如果合法url
if (this.driver.acceptsURL(url)) {
//添加到所有的url列表
allUrls.add(url);
} else {
log.error("that url is has not correct format: {}", url);
}
} catch (SQLException var7) {
throw new IllegalArgumentException("error while checking url: " + url, var7);
}
}
if (allUrls.isEmpty()) {
throw new IllegalArgumentException("there are no correct urls");
} else {
//所有url
this.allUrls = Collections.unmodifiableList(allUrls);
//可用url
this.enabledUrls = this.allUrls;
}
}
}
*/
/**
* 切割url
* @param url
* @return
*//*
static List<String> splitUrl(String url) {
//校验url合法性
Matcher m = URL_TEMPLATE.matcher(url);
if (!m.matches()) {
throw new IllegalArgumentException("Incorrect url");
} else {
String database = m.group(2);
if (database == null) {
database = "";
}
//切割url串
String[] hosts = m.group(1).split(",");
List<String> result = new ArrayList(hosts.length);
String[] var5 = hosts;
int var6 = hosts.length;
//遍历添加切割后的url
for(int var7 = 0; var7 < var6; ++var7) {
String host = var5[var7];
result.add("jdbc:clickhouse://" + host + database);
}
return result;
}
}
*/
/**
* ping url看是否可用
* @param url
* @return
*//*
private boolean ping(String url) {
try {
//执行简单sql测试url链接可用性
this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");
return true;
} catch (Exception var3) {
return false;
}
}
*/
/**
* 遍历所有url通过ping的方式选择出可用的url
* @return
*//*
public synchronized int actualize() {
//新建可用url列表
List<String> enabledUrls = new ArrayList(this.allUrls.size());
Iterator var2 = this.allUrls.iterator();
while(var2.hasNext()) {
String url = (String)var2.next();
log.debug("Pinging disabled url: {}", url);
if (this.ping(url)) {
log.debug("Url is alive now: {}", url);
//ping通的才添加进可用的
enabledUrls.add(url);
} else {
log.debug("Url is dead now: {}", url);
}
}
//重置可用url列表
this.enabledUrls = Collections.unmodifiableList(enabledUrls);
return enabledUrls.size();
}
*/
/**
* 随机获取可用url返回
* @return
* @throws java.sql.SQLException
*//*
private String getAnyUrl() throws SQLException {
//可用url列表
List<String> localEnabledUrls = this.enabledUrls;
if (localEnabledUrls.isEmpty()) {
throw new SQLException("Unable to get connection: there are no enabled urls");
} else {
Random random = (Random)this.randomThreadLocal.get();
if (random == null) {
this.randomThreadLocal.set(new Random());
//产生一个随机数
random = (Random)this.randomThreadLocal.get();
}
int index = random.nextInt(localEnabledUrls.size());
//用随机数选择一个可用的url返回
return (String)localEnabledUrls.get(index);
}
}
public ClickHouseConnection getConnection() throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties);
}
public ClickHouseConnection getConnection(String username, String password) throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));
}
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isAssignableFrom(this.getClass())) {
return iface.cast(this);
} else {
throw new SQLException("Cannot unwrap to " + iface.getName());
}
}
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(this.getClass());
}
public PrintWriter getLogWriter() throws SQLException {
return this.printWriter;
}
public void setLogWriter(PrintWriter printWriter) throws SQLException {
this.printWriter = printWriter;
}
public void setLoginTimeout(int seconds) throws SQLException {
this.loginTimeoutSeconds = seconds;
}
public int getLoginTimeout() throws SQLException {
return this.loginTimeoutSeconds;
}
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}
*/
/**
* 定期清理无用url链接
* @param rate
* @param timeUnit
* @return
*//*
public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {
this.driver.scheduleConnectionsCleaning(rate, timeUnit);
return this;
}
*/
/**
* 定期确认url通过定时任务实现以定时更新可用url列表
* @param delay
* @param timeUnit
* @return
*//*
public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
BalancedClickhouseDataSource.this.actualize();
} catch (Exception var2) {
BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);
}
}
}, 0L, (long)delay, timeUnit);
return this;
}
public List<String> getAllClickhouseUrls() {
return this.allUrls;
}
public List<String> getEnabledClickHouseUrls() {
return this.enabledUrls;
}
*/
/**
* 返回不可用url集合
* 通过all 和 enable的差值来找
*
* @return
*//*
public List<String> getDisabledUrls() {
List<String> enabledUrls = this.enabledUrls;
if (!this.hasDisabledUrls()) {
return Collections.emptyList();
} else {
List<String> disabledUrls = new ArrayList(this.allUrls);
disabledUrls.removeAll(enabledUrls);
return disabledUrls;
}
}
public boolean hasDisabledUrls() {
return this.allUrls.size() != this.enabledUrls.size();
}
public ClickHouseProperties getProperties() {
return this.properties;
}
private static ClickHouseProperties getFromUrl(String url) {
return new ClickHouseProperties(getFromUrlWithoutDefault(url));
}
private static Properties getFromUrlWithoutDefault(String url) {
if (StringUtils.isBlank(url)) {
return new Properties();
} else {
int index = url.indexOf("?");
return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());
}
}
}*/

View File

@@ -50,9 +50,8 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
ClickHouseProperties properties = new ClickHouseProperties();
properties.setDatabase(CK_DATABASE);
properties.setUser(CK_USERNAME);
properties.setPassword(CK_PIN);

View File

@@ -1,52 +0,0 @@
package com.zdjizhi.utils.ck;
import cn.hutool.core.io.IoUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import java.sql.Connection;
import java.sql.SQLException;
import static com.zdjizhi.common.FlowWriteConfig.*;
/**
* @description:
* @author: zhq
* @create: 2022-07-10
**/
public class ClickhouseUtil {
private static final Log log = LogFactory.get();
private static Connection connection;
public static Connection getConnection() {
try {
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
ClickHouseProperties properties = new ClickHouseProperties();
properties.setDatabase(CK_DATABASE);
properties.setUser(CK_USERNAME);
properties.setPassword(CK_PIN);
properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT);
properties.setSocketTimeout(CK_SOCKET_TIMEOUT);
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
connection = dataSource.getConnection();
log.info("get clickhouse connection success");
return connection;
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
return null;
}
public static void close() {
IoUtil.close(connection);
}
}