diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 4de148b..7fef174 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,64 +1,59 @@ #--------------------------------\u5730\u5740\u914D\u7F6E------------------------------# -#\u7BA1\u7406kafka\u5730\u5740 -source.kafka.servers=192.168.45.102:9092 +#\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094 +source.kafka.servers=192.168.45.102:9094 #\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740 -sink.kafka.servers=192.168.45.102:9092 - -#zookeeper \u5730\u5740 \u7528\u4E8E\u914D\u7F6Elog_id -zookeeper.servers=192.168.45.102:2181 - -#--------------------------------HTTP/\u5B9A\u4F4D\u5E93------------------------------# -#\u5B9A\u4F4D\u5E93\u5730\u5740 -tools.library=D:\\workerspace\\dat\\ - +sink.kafka.servers= +#--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------# +tools.library= #--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------# #\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B -group.id=knowledge-group - +group.id=KNOWLEDGE-GROUP #--------------------------------topology\u914D\u7F6E------------------------------# - #consumer \u5E76\u884C\u5EA6 -source.parallelism=1 +source.parallelism=12 #\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6 -transform.parallelism=1 +transform.parallelism=12 #kafka producer \u5E76\u884C\u5EA6 -sink.parallelism=1 -#\u6570\u636E\u4E2D\u5FC3\uFF0C\u53D6\u503C\u8303\u56F4(0-31) -data.center.id.num=0 +sink.parallelism=12 -#--------------------------------\u9ED8\u8BA4\u503C\u914D\u7F6E------------------------------# +#--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------# #1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7 -log.need.complete=2 +log.type=1 #\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy producer.kafka.compression.type=none - -source.kafka.topic.connection=connection_record_log -source.kafka.topic.sketch=connection_sketch_record_log -source.kafka.topic.dns=dns_record_log - +#kafka\u6570\u636E\u6E90topic +source.kafka.topic.connection=CONNECTION-RECORD-LOG +source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG +source.kafka.topic.dns=DNS-RECORD-LOG +#\u5199\u5165clickhouse\u672C\u5730\u8868 sink.ck.table.connection=connection_record_log_local sink.ck.table.sketch=connection_sketch_record_log_local sink.ck.table.dns=dns_record_log_local sink.ck.table.relation.connection=connection_relation_log_local sink.ck.table.relation.dns=dns_relation_log_local +#\u5199arangodb\u8868 +sink.arangodb.table.r.visit.ip2ip=R_VISIT_IP2IP +sink.arangodb.table.r.cname.domain2domain=R_CNAME_DOMAIN2DOMAIN +sink.arangodb.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN +sink.arangodb.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP +sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN -sink.arango.table.r.visit.ip2ip=R_VISIT_IP2IP -sink.arango.table.r.cname.domain2domain=R_CNAME_DOMAIN2DOMAIN -sink.arango.table.r.mx.domain2domain=R_MX_DOMAIN2DOMAIN -sink.arango.table.r.resolve.domain2ip=R_RESOLVE_DOMAIN2IP -sink.arango.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN - -#clickhouse \u5165\u5E93 +#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD7\uFF0C\u6216\u8005\u9009\u62E9\u5B89\u88C5gohangout\u5165\u5E93 0\uFF1A\u5426\uFF0C1\uFF1A\u662F +sink.ck.raw.log.insert.open=1 +#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123 ck.hosts=192.168.45.102:8123,192.168.45.102:8123 ck.database=tsg_galaxy_v3 ck.username=default ck.pin=galaxy2019 -#\u5355\u4F4D\u6BEB\u79D2 +#\u8D85\u65F6\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 ck.connection.timeout=10000 ck.socket.timeout=300000 -ck.batch=10000 +#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761 +ck.batch=10 +#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2 +sink.ck.batch.delay.time=1000 #flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4 flink.watermark.max.delay.time=50 @@ -68,13 +63,13 @@ log.aggregate.duration=5 log.aggregate.duration.graph=5 #arangoDB\u53C2\u6570\u914D\u7F6E -arangoDB.host=192.168.45.102 -arangoDB.port=8529 -arangoDB.user=root -arangoDB.password=galaxy_2019 -arangoDB.DB.name=knowledge -arangoDB.batch=100000 -arangoDB.ttl=3600 -thread.pool.number=10 - -sink.batch.delay.time=5 +arangodb.host=192.168.45.102 +arangodb.port=8529 +arangodb.user=root +arangodb.password=galaxy_2019 +arangodb.db.name=knowledge +arangodb.batch=100000 +arangodb.ttl=3600 +arangodb.thread.pool.number=10 +#\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms +sink.arangodb.batch.delay.time=5 \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 3b68285..7889c88 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -44,8 +44,8 @@ public class FlowWriteConfig { public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism"); public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); - public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); - public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final Integer DATA_CENTER_ID_NUM = 0 ; + public static final Integer LOG_TYPE = FlowWriteConfigurations.getIntProperty(0, "log.type"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); @@ -85,7 +85,7 @@ public class FlowWriteConfig { */ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); - public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String ZOOKEEPER_SERVERS = "zookeeper.servers"; public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); @@ -114,22 +114,24 @@ public class FlowWriteConfig { public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); - public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip"); - public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain"); - public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain"); - public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip"); - public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain"); + public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip"); + public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain"); + public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain"); + public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.resolve.domain2ip"); + public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.nx.domain2domain"); - public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host"); - public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port"); - public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user"); - public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password"); - public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name"); - public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl"); - public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch"); - public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number"); + public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangodb.host"); + public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangodb.port"); + public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangodb.user"); + public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangodb.password"); + public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangodb.db.name"); + public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangodb.ttl"); + public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangodb.batch"); + public static final Integer ARANGODB_THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "arangodb.thread.pool.number"); - public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time"); + public static final Integer SINK_CK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.ck.batch.delay.time"); + public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time"); public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch"); + public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java new file mode 100644 index 0000000..ba348bd --- /dev/null +++ b/src/main/java/com/zdjizhi/etl/CountTriggerWithTimeout.java @@ -0,0 +1,132 @@ +package com.zdjizhi.etl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + +/** + *  * 带超时的计数窗口触发器 + */ +public class CountTriggerWithTimeout extends Trigger { + private static Log logger = LogFactory.get(); + + /** + * 窗口最大数据量 + */ + private int maxCount; + /** + * event time / process time + */ + private TimeCharacteristic timeType; + + private String stateName; + + public String getStateName() { + return stateName; + } + + public void setStateName(String stateName) { + this.stateName = stateName; + } + + public CountTriggerWithTimeout(String stateName) { + this.stateName = stateName; + } + + /** + * 用于储存窗口当前数据量的状态对象 + */ + private ReducingStateDescriptor countStateDescriptor = new ReducingStateDescriptor(getStateName() + "counter", new Sum(), LongSerializer.INSTANCE); + + + public CountTriggerWithTimeout(String stateName, int maxCount, TimeCharacteristic timeType) { + + this.maxCount = maxCount; + this.timeType = timeType; + this.stateName = stateName; + } + + + private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { + clear(window, ctx); + return TriggerResult.FIRE_AND_PURGE; + } + + + @Override + public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + ReducingState countState = ctx.getPartitionedState(countStateDescriptor); + countState.add(1L); + + + if (countState.get() >= maxCount) { + logger.info("fire with count: " + countState.get()); + return fireAndPurge(window, ctx); + } + if (timestamp >= window.getEnd()) { + logger.info("fire with tiem: " + timestamp); + return fireAndPurge(window, ctx); + } else { + return TriggerResult.CONTINUE; + } + } + + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + if (timeType != TimeCharacteristic.ProcessingTime) { + return TriggerResult.CONTINUE; + } + + + if (time >= window.getEnd()) { + return TriggerResult.CONTINUE; + } else { + logger.info("fire with process tiem: " + time); + return fireAndPurge(window, ctx); + } + } + + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + if (timeType != TimeCharacteristic.EventTime) { + return TriggerResult.CONTINUE; + } + + + if (time >= window.getEnd()) { + return TriggerResult.CONTINUE; + } else { + logger.info("fire with event tiem: " + time); + return fireAndPurge(window, ctx); + } + } + + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ReducingState countState = ctx.getPartitionedState(countStateDescriptor); + countState.clear(); + } + + /** + * 计数方法 + */ + class Sum implements ReduceFunction { + + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 7ca7acb..e7ac08e 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -7,12 +7,14 @@ import cn.hutool.log.LogFactory; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.enums.DnsType; import com.zdjizhi.etl.CKBatchWindow; +import com.zdjizhi.etl.CountTriggerWithTimeout; import com.zdjizhi.etl.connection.*; import com.zdjizhi.etl.dns.*; import com.zdjizhi.utils.arangodb.ArangoDBSink; import com.zdjizhi.utils.ck.ClickhouseSink; import com.zdjizhi.utils.kafka.KafkaConsumer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; @@ -35,9 +37,10 @@ public class LogFlowWriteTopology { env.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); //1 connection,2 dns - if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + if (FlowWriteConfig.LOG_TYPE == 1) { //connection DataStream> connSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_CONNECTION)) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .filter(Objects::nonNull) .setParallelism(SOURCE_PARALLELISM) .name(SOURCE_KAFKA_TOPIC_CONNECTION); @@ -53,9 +56,11 @@ public class LogFlowWriteTopology { .assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000)) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION))) .process(new ConnProcessFunction()) + .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM) .filter(x -> Objects.nonNull(x)) .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); @@ -74,20 +79,43 @@ public class LogFlowWriteTopology { .keyBy(new IpKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH))) .process(new Ip2IpGraphProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //写入CKsink,批量处理 - connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink"); - sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink"); - sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink"); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + connSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + + sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_SKETCH, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + } + sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_CONNECTION, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); //写入arangodb - ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP); - - } else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) { + ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(R_VISIT_IP2IP, ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchIPWindow()) + .addSink(new ArangoDBSink(R_VISIT_IP2IP)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM).name(R_VISIT_IP2IP); + } else if (FlowWriteConfig.LOG_TYPE == 2) { DataStream> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(SOURCE_KAFKA_TOPIC_DNS)) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) .filter(Objects::nonNull) .map(new DnsMapFunction()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) @@ -96,23 +124,30 @@ public class LogFlowWriteTopology { DataStream> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME)) .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000)) + .setParallelism(TRANSFORM_PARALLELISM) .flatMap(new DnsSplitFlatMapFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .keyBy(new DnsGraphKeysSelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION))) .process(new DnsRelationProcessFunction()) + .setParallelism(TRANSFORM_PARALLELISM) .filter(Objects::nonNull) .setParallelism(TRANSFORM_PARALLELISM); //dns 原始日志 ck入库 - dnsSource.filter(Objects::nonNull) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) - .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) - .setParallelism(FlowWriteConfig.SINK_PARALLELISM) - .name("CKSink"); + if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) { + dnsSource.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) + .addSink(new ClickhouseSink(SINK_CK_TABLE_DNS)) + .setParallelism(FlowWriteConfig.SINK_PARALLELISM) + .name("CKSink"); + } //dns 拆分后relation日志 ck入库 - dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()) + dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(SINK_CK_TABLE_RELATION_DNS, CK_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new CKBatchWindow()) .addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS)) .setParallelism(SINK_PARALLELISM) .name("CKSink"); @@ -125,7 +160,10 @@ public class LogFlowWriteTopology { for (DnsType dnsEnum : DnsType.values()) { dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type"))) - .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow()) + .setParallelism(SINK_PARALLELISM) + .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME))) + .trigger(new CountTriggerWithTimeout<>(dnsEnum.getSink(), ARANGODB_BATCH, TimeCharacteristic.ProcessingTime)) + .apply(new ArangodbBatchDnsWindow()) .addSink(new ArangoDBSink(dnsEnum.getSink())) .setParallelism(SINK_PARALLELISM) .name("ArangodbSink"); diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java index caf4e79..4902e5f 100644 --- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java +++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBConnect.java @@ -29,7 +29,7 @@ public class ArangoDBConnect { private static void getArangoDB() { arangoDB = new ArangoDB.Builder() - .maxConnections(FlowWriteConfig.THREAD_POOL_NUMBER) + .maxConnections(FlowWriteConfig.ARANGODB_THREAD_POOL_NUMBER) .host(FlowWriteConfig.ARANGODB_HOST, FlowWriteConfig.ARANGODB_PORT) .user(FlowWriteConfig.ARANGODB_USER) .password(FlowWriteConfig.ARANGODB_PASSWORD) diff --git a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java b/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java deleted file mode 100644 index 2449244..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/BalancedClickhouseDataSource.java +++ /dev/null @@ -1,335 +0,0 @@ -/* -package com.zdjizhi.utils.ck; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHouseDriver; -import ru.yandex.clickhouse.ClickhouseJdbcUrlParser; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import javax.sql.DataSource; -import java.io.PrintWriter; -import java.net.URISyntaxException; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -*/ -/** - * 提供负载均衡能力的datasource实现 - *//* - -public class BalancedClickhouseDataSource implements DataSource { - private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class); - private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?"); - private PrintWriter printWriter; - private int loginTimeoutSeconds; - //随机数 - private final ThreadLocal randomThreadLocal; - //所有的url - private final List allUrls; - //可用的url - private volatile List enabledUrls; - private final ClickHouseProperties properties; - private final ClickHouseDriver driver; - - public BalancedClickhouseDataSource(String url) { - this(splitUrl(url), getFromUrl(url)); - } - - public BalancedClickhouseDataSource(String url, Properties properties) { - this(splitUrl(url), new ClickHouseProperties(properties)); - } - - public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) { - this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url))); - } - - private BalancedClickhouseDataSource(List urls) { - this(urls, new ClickHouseProperties()); - } - - private BalancedClickhouseDataSource(List urls, Properties info) { - this(urls, new ClickHouseProperties(info)); - } - - private BalancedClickhouseDataSource(List urls, ClickHouseProperties properties) { - this.loginTimeoutSeconds = 0; - this.randomThreadLocal = new ThreadLocal(); - this.driver = new ClickHouseDriver(); - if (urls.isEmpty()) { - throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty"); - } else { - try { - //解析配置文件 - ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties()); - localProperties.setHost((String)null); - localProperties.setPort(-1); - this.properties = localProperties; - } catch (URISyntaxException var8) { - throw new IllegalArgumentException(var8); - } - - List allUrls = new ArrayList(urls.size()); - Iterator var4 = urls.iterator(); - - while(var4.hasNext()) { - String url = (String)var4.next(); - - try { - //如果合法url - if (this.driver.acceptsURL(url)) { - //添加到所有的url列表 - allUrls.add(url); - } else { - log.error("that url is has not correct format: {}", url); - } - } catch (SQLException var7) { - throw new IllegalArgumentException("error while checking url: " + url, var7); - } - } - - if (allUrls.isEmpty()) { - throw new IllegalArgumentException("there are no correct urls"); - } else { - //所有url - this.allUrls = Collections.unmodifiableList(allUrls); - //可用url - this.enabledUrls = this.allUrls; - } - } - } - - */ -/** - * 切割url - * @param url - * @return - *//* - - static List splitUrl(String url) { - //校验url合法性 - Matcher m = URL_TEMPLATE.matcher(url); - if (!m.matches()) { - throw new IllegalArgumentException("Incorrect url"); - } else { - String database = m.group(2); - if (database == null) { - database = ""; - } - - //切割url串 - String[] hosts = m.group(1).split(","); - List result = new ArrayList(hosts.length); - String[] var5 = hosts; - int var6 = hosts.length; - - //遍历,添加切割后的url - for(int var7 = 0; var7 < var6; ++var7) { - String host = var5[var7]; - result.add("jdbc:clickhouse://" + host + database); - } - - return result; - } - } - - */ -/** - * ping url看是否可用 - * @param url - * @return - *//* - - private boolean ping(String url) { - try { - //执行简单sql测试url链接可用性 - this.driver.connect(url, this.properties).createStatement().execute("SELECT 1"); - return true; - } catch (Exception var3) { - return false; - } - } - - */ -/** - * 遍历所有url,通过ping的方式,选择出可用的url - * @return - *//* - - public synchronized int actualize() { - //新建可用url列表 - List enabledUrls = new ArrayList(this.allUrls.size()); - Iterator var2 = this.allUrls.iterator(); - - while(var2.hasNext()) { - String url = (String)var2.next(); - log.debug("Pinging disabled url: {}", url); - if (this.ping(url)) { - log.debug("Url is alive now: {}", url); - //ping通的才添加进可用的 - enabledUrls.add(url); - } else { - log.debug("Url is dead now: {}", url); - } - } - - //重置可用url列表 - this.enabledUrls = Collections.unmodifiableList(enabledUrls); - return enabledUrls.size(); - } - - */ -/** - * 随机获取可用url返回 - * @return - * @throws java.sql.SQLException - *//* - - private String getAnyUrl() throws SQLException { - //可用url列表 - List localEnabledUrls = this.enabledUrls; - if (localEnabledUrls.isEmpty()) { - throw new SQLException("Unable to get connection: there are no enabled urls"); - } else { - Random random = (Random)this.randomThreadLocal.get(); - if (random == null) { - this.randomThreadLocal.set(new Random()); - //产生一个随机数 - random = (Random)this.randomThreadLocal.get(); - } - - int index = random.nextInt(localEnabledUrls.size()); - //用随机数选择一个可用的url返回 - return (String)localEnabledUrls.get(index); - } - } - - public ClickHouseConnection getConnection() throws SQLException { - return this.driver.connect(this.getAnyUrl(), this.properties); - } - - public ClickHouseConnection getConnection(String username, String password) throws SQLException { - return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password)); - } - - public T unwrap(Class iface) throws SQLException { - if (iface.isAssignableFrom(this.getClass())) { - return iface.cast(this); - } else { - throw new SQLException("Cannot unwrap to " + iface.getName()); - } - } - - public boolean isWrapperFor(Class iface) throws SQLException { - return iface.isAssignableFrom(this.getClass()); - } - - public PrintWriter getLogWriter() throws SQLException { - return this.printWriter; - } - - public void setLogWriter(PrintWriter printWriter) throws SQLException { - this.printWriter = printWriter; - } - - public void setLoginTimeout(int seconds) throws SQLException { - this.loginTimeoutSeconds = seconds; - } - - public int getLoginTimeout() throws SQLException { - return this.loginTimeoutSeconds; - } - - public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { - throw new SQLFeatureNotSupportedException(); - } - - */ -/** - * 定期清理无用url链接 - * @param rate - * @param timeUnit - * @return - *//* - - public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) { - this.driver.scheduleConnectionsCleaning(rate, timeUnit); - return this; - } - - */ -/** - * 定期确认url,通过定时任务实现,以定时更新可用url列表 - * @param delay - * @param timeUnit - * @return - *//* - - public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) { - ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() { - public void run() { - try { - BalancedClickhouseDataSource.this.actualize(); - } catch (Exception var2) { - BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2); - } - - } - }, 0L, (long)delay, timeUnit); - return this; - } - - public List getAllClickhouseUrls() { - return this.allUrls; - } - - public List getEnabledClickHouseUrls() { - return this.enabledUrls; - } - - */ -/** - * 返回不可用url集合 - * 通过all 和 enable的差值来找 - * - * @return - *//* - - public List getDisabledUrls() { - List enabledUrls = this.enabledUrls; - if (!this.hasDisabledUrls()) { - return Collections.emptyList(); - } else { - List disabledUrls = new ArrayList(this.allUrls); - disabledUrls.removeAll(enabledUrls); - return disabledUrls; - } - } - - public boolean hasDisabledUrls() { - return this.allUrls.size() != this.enabledUrls.size(); - } - - public ClickHouseProperties getProperties() { - return this.properties; - } - - private static ClickHouseProperties getFromUrl(String url) { - return new ClickHouseProperties(getFromUrlWithoutDefault(url)); - } - - private static Properties getFromUrlWithoutDefault(String url) { - if (StringUtils.isBlank(url)) { - return new Properties(); - } else { - int index = url.indexOf("?"); - return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties()); - } - } -}*/ diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java index d3e14cc..7f9c63d 100644 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java +++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java @@ -50,9 +50,8 @@ public class ClickhouseSink extends RichSinkFunction>> public void open(Configuration parameters) throws Exception { super.open(parameters); try { -// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); -// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); ClickHouseProperties properties = new ClickHouseProperties(); + properties.setDatabase(CK_DATABASE); properties.setUser(CK_USERNAME); properties.setPassword(CK_PIN); diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java deleted file mode 100644 index 975731c..0000000 --- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseUtil.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.zdjizhi.utils.ck; - -import cn.hutool.core.io.IoUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import ru.yandex.clickhouse.BalancedClickhouseDataSource; -import ru.yandex.clickhouse.settings.ClickHouseProperties; - -import java.sql.Connection; -import java.sql.SQLException; - -import static com.zdjizhi.common.FlowWriteConfig.*; - -/** - * @description: - * @author: zhq - * @create: 2022-07-10 - **/ -public class ClickhouseUtil { - - private static final Log log = LogFactory.get(); - - private static Connection connection; - - - public static Connection getConnection() { - try { -// Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); -// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN); - ClickHouseProperties properties = new ClickHouseProperties(); - properties.setDatabase(CK_DATABASE); - properties.setUser(CK_USERNAME); - properties.setPassword(CK_PIN); - properties.setConnectionTimeout(CK_CONNECTION_TIMEOUT); - properties.setSocketTimeout(CK_SOCKET_TIMEOUT); - - BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties); - connection = dataSource.getConnection(); - - log.info("get clickhouse connection success"); - return connection; - } catch (SQLException e) { - log.error("clickhouse connection error ,{}", e); - } - return null; - } - - public static void close() { - IoUtil.close(connection); - } - -}