代码优化,使用定时线程池刷新写入clickhouse
This commit is contained in:
7
pom.xml
7
pom.xml
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>knowledge-log</artifactId>
|
||||
<version>20220819</version>
|
||||
<version>20220901</version>
|
||||
|
||||
<name>log-completion-schema</name>
|
||||
<url>http://www.example.com</url>
|
||||
@@ -235,6 +235,11 @@
|
||||
<artifactId>HikariCP</artifactId>
|
||||
<version>3.2.0</version>
|
||||
</dependency>-->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>ru.ivi.opensource</groupId>-->
|
||||
<!-- <artifactId>flink-clickhouse-sink</artifactId>-->
|
||||
<!-- <version>1.3.3</version>-->
|
||||
<!-- </dependency>-->
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
#--------------------------------\u5730\u5740\u914D\u7F6E------------------------------#
|
||||
#\u7BA1\u7406kafka\u5730\u5740,\u591A\u53F0\u9017\u53F7\u8FDE\u63A5ip1:9094,ip2:9094
|
||||
source.kafka.servers=192.168.44.85:9094,192.168.44.86:9094,192.168.44.87:9094
|
||||
source.kafka.servers=192.168.44.12:9094
|
||||
|
||||
#\u7BA1\u7406\u8F93\u51FAkafka\u5730\u5740
|
||||
sink.kafka.servers=
|
||||
sink.kafka.servers=192.168.44.12:9094
|
||||
#--------------------------------HTTP/\u5B9A\u4F4D\u5E93/ssl------------------------------#
|
||||
tools.library=
|
||||
#--------------------------------Kafka\u6D88\u8D39/\u751F\u4EA7\u914D\u7F6E------------------------------#
|
||||
#\u8BFB\u53D6topic,\u5B58\u50A8\u8BE5spout id\u7684\u6D88\u8D39offset\u4FE1\u606F\uFF0C\u53EF\u901A\u8FC7\u8BE5\u62D3\u6251\u547D\u540D;\u5177\u4F53\u5B58\u50A8offset\u7684\u4F4D\u7F6E\uFF0C\u786E\u5B9A\u4E0B\u6B21\u8BFB\u53D6\u4E0D\u91CD\u590D\u7684\u6570\u636E\uFF1B
|
||||
group.id=KNOWLEDGE-GROUP5
|
||||
group.id=KNOWLEDGE-GROUPtest
|
||||
#--------------------------------topology\u914D\u7F6E------------------------------#
|
||||
#consumer \u5E76\u884C\u5EA6
|
||||
source.parallelism=1
|
||||
#\u8F6C\u6362\u51FD\u6570\u5E76\u884C\u5EA6
|
||||
transform.parallelism=1
|
||||
#kafka producer \u5E76\u884C\u5EA6
|
||||
sink.parallelism=3
|
||||
sink.parallelism=1
|
||||
|
||||
#--------------------------------\u4E1A\u52A1\u914D\u7F6E------------------------------#
|
||||
#1 connection\u65E5\u5FD7 \uFF0C2 dns\u65E5\u5FD7
|
||||
log.type=2
|
||||
log.type=1
|
||||
|
||||
#\u751F\u4EA7\u8005\u538B\u7F29\u6A21\u5F0F none or snappy
|
||||
producer.kafka.compression.type=none
|
||||
@@ -27,6 +27,10 @@ producer.kafka.compression.type=none
|
||||
source.kafka.topic.connection=CONNECTION-RECORD-LOG
|
||||
source.kafka.topic.sketch=CONNECTION-SKETCH-RECORD-LOG
|
||||
source.kafka.topic.dns=DNS-RECORD-LOG
|
||||
#kafka\u56DE\u5199\u7EDF\u8BA1\u6570\u636E
|
||||
sink.kafka.topic.relation.connection=CONNECTION-RELATION-LOG
|
||||
sink.kafka.topic.relation.dns=DNS-RELATION-LOG
|
||||
|
||||
#\u5199\u5165clickhouse\u672C\u5730\u8868
|
||||
sink.ck.table.connection=connection_record_log_local
|
||||
sink.ck.table.sketch=connection_sketch_record_log_local
|
||||
@@ -43,7 +47,8 @@ sink.arangodb.table.r.nx.domain2domain=R_NX_DOMAIN2DOMAIN
|
||||
#\u4F7F\u7528flink\u5165\u5E93\u539F\u59CB\u65E5\u5FD70\uFF1A\u5426\uFF0C1\uFF1A\u662F
|
||||
sink.ck.raw.log.insert.open=1
|
||||
#clickhouse\u914D\u7F6E\uFF0C\u591A\u4E2A\u9017\u53F7\u8FDE\u63A5 ip1:8123,ip2:8123
|
||||
ck.hosts=192.168.44.85:8123,192.168.44.86:8123,192.168.44.87:8123
|
||||
ck.hosts=192.168.44.12:8123
|
||||
# ,192.168.44.86:8123,192.168.44.87:8123
|
||||
ck.database=tsg_galaxy_v3
|
||||
ck.username=tsg_insert
|
||||
ck.pin=galaxy2019
|
||||
@@ -51,19 +56,19 @@ ck.pin=galaxy2019
|
||||
ck.connection.timeout=10000
|
||||
ck.socket.timeout=600000
|
||||
#clickhouse\u5165\u5E93\u6279\u91CF\u5355\u4F4D\u6761
|
||||
ck.batch=100000
|
||||
ck.batch=20000
|
||||
#clickhouse\u5165\u5E93\u524D\u79EF\u7D2F\u6279\u91CF\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2
|
||||
sink.ck.batch.delay.time=30000
|
||||
sink.ck.batch.delay.time=3000
|
||||
|
||||
#flink \u65E5\u5FD7\u5EF6\u8FDF\u8D85\u65F6\u65F6\u95F4
|
||||
flink.watermark.max.delay.time=60
|
||||
#ck relation\u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
|
||||
log.aggregate.duration=30
|
||||
log.aggregate.duration=10
|
||||
#arangodb \u7EDF\u8BA1\u65F6\u95F4\u95F4\u9694 \u5355\u4F4Ds
|
||||
log.aggregate.duration.graph=600
|
||||
log.aggregate.duration.graph=30
|
||||
|
||||
#arangoDB\u53C2\u6570\u914D\u7F6E
|
||||
arangodb.host=192.168.44.83
|
||||
arangodb.host=192.168.44.12
|
||||
arangodb.port=8529
|
||||
arangodb.user=root
|
||||
arangodb.password=galaxy_2019
|
||||
@@ -73,3 +78,5 @@ arangodb.ttl=3600
|
||||
arangodb.thread.pool.number=10
|
||||
#\u6279\u91CF\u7D2F\u8BA1\u65F6\u95F4\u5355\u4F4D\u6BEB\u79D2ms
|
||||
sink.arangodb.batch.delay.time=1000
|
||||
|
||||
aggregate.max.value.length=18
|
||||
@@ -106,7 +106,8 @@ public class FlowWriteConfig {
|
||||
;
|
||||
public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
|
||||
public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
|
||||
|
||||
public static final String SINK_KAFKA_TOPIC_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.connection");
|
||||
public static final String SINK_KAFKA_TOPIC_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.dns");
|
||||
//sink.ck.table
|
||||
public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection");
|
||||
public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch");
|
||||
@@ -114,6 +115,7 @@ public class FlowWriteConfig {
|
||||
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
|
||||
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
|
||||
|
||||
|
||||
public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip");
|
||||
public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain");
|
||||
public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain");
|
||||
@@ -134,4 +136,5 @@ public class FlowWriteConfig {
|
||||
public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time");
|
||||
public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
|
||||
public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
|
||||
public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length");
|
||||
}
|
||||
@@ -18,12 +18,7 @@ public enum LogMetadata {
|
||||
* 日志名称,表名,字段
|
||||
* */
|
||||
|
||||
CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{
|
||||
"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status",
|
||||
"dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy",
|
||||
"user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent",
|
||||
"http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num",
|
||||
"imei", "imsi"}),
|
||||
CONNECTION_RECORD_LOG("connection_record_log", "connection_record_log_local", new String[]{"cap_ip", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "protocol", "fxo_id", "link_status", "dir_status", "total_cs_pkts", "total_sc_pkts", "total_cs_bytes", "total_sc_bytes", "log_gen_time", "aa", "wv", "yy", "user_mask", "conn_start_time", "app_class", "app_id", "http_host", "http_url", "http_cookie", "http_user_agent", "http_method", "http_accept", "http_accept_encoding", "http_referer", "http_rescode", "tls_sni", "tls_cert", "phone_num", "imei", "imsi"}),
|
||||
CONNECTION_RELATION_LOG("connection_relation_log", "connection_relation_log_local", new String[]{"start_time", "end_time", "src_ip", "dst_ip", "sessions", "packets", "bytes"}),
|
||||
CONNECTION_SKETCH_RECORD_LOG("connection_sketch_record_log", "connection_sketch_record_log_local", new String[]{"sled_ip", "sketch_start_time", "sketch_duration", "src_ip", "dst_ip", "sketch_sessions", "sketch_packets", "sketch_bytes"}),
|
||||
DNS_RECORD_LOG("dns_record_log", "dns_record_log_local", new String[]{"capture_time", "recv_ip", "src_ip", "dst_ip", "src_port", "dst_port", "addr_type", "dns_flag", "ttl", "protocol", "fxo_id", "req_type", "qname", "response", "dns_a", "dns_a_num", "dns_cname", "dns_cname_num", "dns_aaaa", "dns_aaaa_num", "dns_mx", "dns_mx_num", "dns_ns", "dns_ns_num"}),
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.zdjizhi.etl.connection.ArangodbBatchIPWindow;
|
||||
import com.zdjizhi.utils.arangodb.ArangoDBSink;
|
||||
import com.zdjizhi.utils.ck.ClickhouseSink;
|
||||
import com.zdjizhi.utils.ck.CKSink;
|
||||
import com.zdjizhi.utils.kafka.KafkaProducer;
|
||||
import org.apache.flink.streaming.api.TimeCharacteristic;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
@@ -12,19 +14,21 @@ import java.util.Map;
|
||||
|
||||
import static com.zdjizhi.common.FlowWriteConfig.*;
|
||||
|
||||
public interface LogService {
|
||||
public class LogService {
|
||||
|
||||
public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception{
|
||||
|
||||
sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
|
||||
.trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
|
||||
.apply(new CKBatchWindow())
|
||||
.addSink(new ClickhouseSink(sink))
|
||||
.setParallelism(SINK_PARALLELISM)
|
||||
.name(sink)
|
||||
.setParallelism(SINK_PARALLELISM);
|
||||
|
||||
}
|
||||
// @Deprecated
|
||||
// public static void getLogCKSink3(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
|
||||
//
|
||||
// sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_CK_BATCH_DELAY_TIME)))
|
||||
// .trigger(new CountTriggerWithTimeout<>(sink, CK_BATCH, TimeCharacteristic.ProcessingTime))
|
||||
// .apply(new CKBatchWindow())
|
||||
// .addSink(new ClickhouseSink(sink))
|
||||
// .setParallelism(SINK_PARALLELISM)
|
||||
// .name(sink)
|
||||
// .setParallelism(SINK_PARALLELISM);
|
||||
//
|
||||
// }
|
||||
|
||||
public static void getLogArangoSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
|
||||
sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(SINK_ARANGODB_BATCH_DELAY_TIME)))
|
||||
@@ -35,4 +39,21 @@ public interface LogService {
|
||||
.name(sink)
|
||||
.setParallelism(SINK_PARALLELISM);
|
||||
}
|
||||
|
||||
public static void getLogKafkaSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
|
||||
sourceStream.map(JSONUtil::toJsonStr)
|
||||
.setParallelism(SINK_PARALLELISM)
|
||||
.addSink(KafkaProducer.getKafkaProducer(sink))
|
||||
.setParallelism(SINK_PARALLELISM)
|
||||
.name(sink)
|
||||
.setParallelism(SINK_PARALLELISM);
|
||||
}
|
||||
|
||||
public static void getLogCKSink(DataStream<Map<String, Object>> sourceStream, String sink) throws Exception {
|
||||
sourceStream.addSink(new CKSink(sink))
|
||||
.setParallelism(SINK_PARALLELISM)
|
||||
.name(sink)
|
||||
.setParallelism(SINK_PARALLELISM);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -7,8 +7,9 @@ import com.zdjizhi.etl.dns.SketchTimeMapFunction;
|
||||
import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
|
||||
import java.time.Duration;
|
||||
@@ -25,17 +26,18 @@ public class ConnLogService {
|
||||
DataStream<Map<String, Object>> connSource = getLogSource(env, SOURCE_KAFKA_TOPIC_CONNECTION);
|
||||
//sketch
|
||||
DataStream<Map<String, Object>> sketchSource = getLogSource(env, SOURCE_KAFKA_TOPIC_SKETCH);
|
||||
|
||||
//写入CKsink,批量处理
|
||||
LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
|
||||
|
||||
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
|
||||
|
||||
//transform
|
||||
DataStream<Map<String, Object>> connTransformStream = getConnTransformStream(connSource);
|
||||
|
||||
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
|
||||
//写入CKsink,批量处理
|
||||
LogService.getLogCKSink(connSource, SINK_CK_TABLE_CONNECTION);
|
||||
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
|
||||
//写入ck通联relation表
|
||||
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
|
||||
} else {
|
||||
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
|
||||
}
|
||||
|
||||
DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
|
||||
|
||||
@@ -57,14 +59,31 @@ public class ConnLogService {
|
||||
|
||||
String timeFilter = SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? "conn_start_time" : "sketch_start_time";
|
||||
|
||||
DataStream<Map<String, Object>> sourceStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
|
||||
SingleOutputStreamOperator<Map<String, Object>> filterStream = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
|
||||
.setParallelism(SOURCE_PARALLELISM)
|
||||
.filter(x -> Objects.nonNull(x) && Convert.toLong(x.get(timeFilter)) > 0)
|
||||
.filter(x -> {
|
||||
if (Objects.isNull(x) || Convert.toLong(x.get(timeFilter)) <= 0) {
|
||||
return false;
|
||||
}
|
||||
if (SOURCE_KAFKA_TOPIC_CONNECTION.equals(source)) {
|
||||
if (String.valueOf(x.get("total_cs_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_pkts")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
|
||||
String.valueOf(x.get("total_cs_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("total_sc_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} else if (SOURCE_KAFKA_TOPIC_SKETCH.equals(source)) {
|
||||
if (String.valueOf(x.get("sketch_sessions")).length() >= AGGREGATE_MAX_VALUE_LENGTH || String.valueOf(x.get("sketch_packets")).length() >= AGGREGATE_MAX_VALUE_LENGTH ||
|
||||
String.valueOf(x.get("sketch_bytes")).length() >= AGGREGATE_MAX_VALUE_LENGTH) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}).setParallelism(SOURCE_PARALLELISM);
|
||||
DataStream<Map<String, Object>> sourceStream = filterStream.map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
|
||||
.setParallelism(SOURCE_PARALLELISM)
|
||||
.map(SOURCE_KAFKA_TOPIC_CONNECTION.equals(source) ? new ConnTimeMapFunction() : new SketchTimeMapFunction())
|
||||
.setParallelism(SOURCE_PARALLELISM)
|
||||
.name(source)
|
||||
.setParallelism(SOURCE_PARALLELISM);
|
||||
.name(source);
|
||||
return sourceStream;
|
||||
}
|
||||
|
||||
@@ -77,7 +96,7 @@ public class ConnLogService {
|
||||
}))
|
||||
.setParallelism(TRANSFORM_PARALLELISM)
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.process(new ConnProcessFunction())
|
||||
.setParallelism(TRANSFORM_PARALLELISM);
|
||||
return connTransformStream;
|
||||
@@ -88,7 +107,7 @@ public class ConnLogService {
|
||||
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
|
||||
.withTimestampAssigner((event, timestamp) -> TypeUtils.castToLong(event.get("sketch_start_time")) * 1000))
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.process(new SketchProcessFunction());
|
||||
return sketchTransformStream;
|
||||
}
|
||||
@@ -96,7 +115,7 @@ public class ConnLogService {
|
||||
private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
|
||||
DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
|
||||
.keyBy(new IpKeysSelector())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||
.process(new Ip2IpGraphProcessFunction())
|
||||
.setParallelism(TRANSFORM_PARALLELISM);
|
||||
return ip2ipGraph;
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
//import org.apache.flink.api.java.utils.ParameterTool;
|
||||
//import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
//import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
//import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
//import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
|
||||
//import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
|
||||
@@ -134,7 +134,7 @@
|
||||
// }))
|
||||
// .setParallelism(TRANSFORM_PARALLELISM)
|
||||
// .keyBy(new IpKeysSelector())
|
||||
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
// .process(new ConnProcessFunction())
|
||||
// .setParallelism(TRANSFORM_PARALLELISM);
|
||||
// return connTransformStream;
|
||||
@@ -145,7 +145,7 @@
|
||||
// .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
|
||||
// .withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
|
||||
// .keyBy(new IpKeysSelector())
|
||||
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
// .process(new SketchProcessFunction());
|
||||
// return sketchTransformStream;
|
||||
// }
|
||||
@@ -153,7 +153,7 @@
|
||||
// private static DataStream<Map<String, Object>> getConnUnion(DataStream<Map<String, Object>> connTransformStream, DataStream<Map<String, Object>> sketchTransformStream) throws Exception {
|
||||
// DataStream<Map<String, Object>> ip2ipGraph = connTransformStream.union(sketchTransformStream)
|
||||
// .keyBy(new IpKeysSelector())
|
||||
// .window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||
// .window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
|
||||
// .process(new Ip2IpGraphProcessFunction())
|
||||
// .setParallelism(TRANSFORM_PARALLELISM);
|
||||
// return ip2ipGraph;
|
||||
|
||||
@@ -56,16 +56,13 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
|
||||
long connStartTimetime = Convert.toLong(newSketchLog.get("conn_start_time"));
|
||||
if (connStartTimetime > 0) {
|
||||
sessions++;
|
||||
Long totalCsPkts = TypeUtils.castToLong(newSketchLog.get("total_cs_pkts"));
|
||||
Long totalScPkts = TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
|
||||
packets = packets + totalCsPkts < Long.MAX_VALUE ? totalCsPkts : 0 + totalScPkts < Long.MAX_VALUE ? totalScPkts : 0;
|
||||
|
||||
Long totalCsBytes = TypeUtils.castToLong(newSketchLog.get("total_cs_bytes"));
|
||||
Long totalScBytes = TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
|
||||
bytes = bytes + totalCsBytes< Long.MAX_VALUE ? totalCsBytes : 0 + totalScBytes< Long.MAX_VALUE ? totalScBytes : 0;
|
||||
packets = packets + TypeUtils.castToLong(newSketchLog.get("total_cs_pkts")) + TypeUtils.castToLong(newSketchLog.get("total_sc_pkts"));
|
||||
bytes = bytes + TypeUtils.castToLong(newSketchLog.get("total_cs_bytes")) + TypeUtils.castToLong(newSketchLog.get("total_sc_bytes"));
|
||||
|
||||
startTime = connStartTimetime < startTime ? connStartTimetime : startTime;
|
||||
endTime = connStartTimetime > endTime ? connStartTimetime : endTime;
|
||||
packets = packets > Long.MAX_VALUE ? 0 : packets;
|
||||
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
|
||||
}
|
||||
}
|
||||
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
package com.zdjizhi.etl.connection;
|
||||
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.util.TypeUtils;
|
||||
import org.apache.flink.api.common.functions.ReduceFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple5;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* @author 94976
|
||||
*/
|
||||
public class ConnReduceFunction implements ReduceFunction<Map<String, Object>> {
|
||||
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
//
|
||||
// public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
|
||||
// try {
|
||||
// Tuple5<Long, Long, Long, Long, Long> values = sum(elements);
|
||||
// if (values != null) {
|
||||
// Map<String, Object> result = new HashMap<>();
|
||||
// result.put("start_time", values.f0);
|
||||
// result.put("end_time", values.f1);
|
||||
// result.put("src_ip", keys.f0);
|
||||
// result.put("dst_ip", keys.f1);
|
||||
// result.put("sessions", values.f2);
|
||||
// result.put("packets", values.f3);
|
||||
// result.put("bytes", values.f4);
|
||||
// out.collect(result);
|
||||
// logger.debug("获取中间聚合结果:{}", result.toString());
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// logger.error("获取中间聚合结果失败,middleResult: {}", e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
private Tuple5<Long, Long, Long, Long, Long> sum(Map<String, Object> map1, Map<String, Object> map2) {
|
||||
|
||||
try {
|
||||
long sessions = 0L;
|
||||
long packets = 0L;
|
||||
long bytes = 0L;
|
||||
long startTime = DateUtil.currentSeconds();
|
||||
long endTime = DateUtil.currentSeconds();
|
||||
|
||||
long connStartTime1 = Convert.toLong(map1.get("conn_start_time"));
|
||||
long connStartTime2 = Convert.toLong(map2.get("conn_start_time"));
|
||||
if (connStartTime1 > 0 && connStartTime2 > 0) {
|
||||
sessions++;
|
||||
packets = TypeUtils.castToLong(map1.get("total_cs_pkts")) + TypeUtils.castToLong(map1.get("total_sc_pkts")) +
|
||||
TypeUtils.castToLong(map2.get("total_cs_pkts")) + TypeUtils.castToLong(map2.get("total_sc_pkts"));
|
||||
|
||||
bytes = bytes + TypeUtils.castToLong(map1.get("total_cs_bytes")) + TypeUtils.castToLong(map1.get("total_sc_bytes")) +
|
||||
TypeUtils.castToLong(map2.get("total_cs_bytes")) + TypeUtils.castToLong(map2.get("total_sc_bytes"));
|
||||
|
||||
startTime = connStartTime1 < connStartTime2 ? connStartTime1 : connStartTime2;
|
||||
endTime = connStartTime2 < connStartTime1 ? connStartTime1 : connStartTime2;
|
||||
|
||||
packets = packets > Long.MAX_VALUE ? 0 : packets;
|
||||
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
|
||||
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("聚合中间结果集失败 {}", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> reduce(Map<String, Object> map1, Map<String, Object> map2) throws Exception {
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -69,11 +69,15 @@ public class SketchProcessFunction extends ProcessWindowFunction<Map<String, Obj
|
||||
for (Map<String, Object> newSketchLog : elements) {
|
||||
long connStartTime = Convert.toLong(newSketchLog.get("sketch_start_time"));
|
||||
if (connStartTime > 0) {
|
||||
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_sessions")) : 0;
|
||||
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_packets")) : 0;
|
||||
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) < Long.MAX_VALUE ? TypeUtils.castToLong(newSketchLog.get("sketch_bytes")) : 0;
|
||||
sessions += TypeUtils.castToLong(newSketchLog.get("sketch_sessions"));
|
||||
packets += TypeUtils.castToLong(newSketchLog.get("sketch_packets"));
|
||||
bytes += TypeUtils.castToLong(newSketchLog.get("sketch_bytes"));
|
||||
startTime = connStartTime < startTime ? connStartTime : startTime;
|
||||
endTime = connStartTime > endTime ? connStartTime : endTime;
|
||||
|
||||
sessions = sessions > Long.MAX_VALUE ? 0 : sessions;
|
||||
packets = packets > Long.MAX_VALUE ? 0 : packets;
|
||||
bytes = bytes > Long.MAX_VALUE ? 0 : bytes;
|
||||
}
|
||||
}
|
||||
return Tuple5.of(startTime, endTime + LOG_AGGREGATE_DURATION, sessions, packets, bytes);
|
||||
|
||||
@@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
|
||||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ public class DnsGraphProcessFunction extends ProcessWindowFunction<Map<String, O
|
||||
Long startTime = Convert.toLong(log.get("start_time"));
|
||||
tmpTime = startTime > tmpTime ? startTime : tmpTime;
|
||||
}
|
||||
Map newLog = new LinkedHashMap<>();
|
||||
Map newLog = new HashMap<>();
|
||||
newLog.put("record_type", keys.f0);
|
||||
newLog.put("qname", keys.f1);
|
||||
newLog.put("record", keys.f2);
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.zdjizhi.utils.kafka.KafkaConsumer;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
|
||||
@@ -23,13 +24,16 @@ public class DnsLogService {
|
||||
|
||||
DataStream<Map<String, Object>> dnsSource = getLogSource(env, SOURCE_KAFKA_TOPIC_DNS);
|
||||
|
||||
//dns 原始日志 ck入库
|
||||
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
|
||||
|
||||
DataStream<Map<String, Object>> dnsTransform = getDnsTransformStream(dnsSource);
|
||||
|
||||
if (SINK_CK_RAW_LOG_INSERT_OPEN == 1) {
|
||||
//dns 原始日志 ck入库
|
||||
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
|
||||
//dns 拆分后relation日志 ck入库
|
||||
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
|
||||
} else {
|
||||
LogService.getLogCKSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
|
||||
}
|
||||
|
||||
//arango 入库,按record_type分组入不同的表
|
||||
DataStream<Map<String, Object>> dnsGraph = dnsTransform.filter(Objects::nonNull)
|
||||
@@ -67,7 +71,7 @@ public class DnsLogService {
|
||||
.flatMap(new DnsSplitFlatMapFunction())
|
||||
.setParallelism(TRANSFORM_PARALLELISM)
|
||||
.keyBy(new DnsGraphKeysSelector())
|
||||
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.window(TumblingEventTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
|
||||
.process(new DnsRelationProcessFunction())
|
||||
.setParallelism(TRANSFORM_PARALLELISM);
|
||||
return dnsTransform;
|
||||
|
||||
@@ -26,6 +26,8 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
|
||||
public Map<String, Object> map(Map<String, Object> rawLog) throws Exception {
|
||||
try {
|
||||
rawLog.put("capture_time", TypeUtils.coverMSToS(rawLog.get("capture_time")));
|
||||
//qname ,record 转小写
|
||||
rawLog.put("qname", StringUtils.lowerCase(StrUtil.toString(rawLog.get("qname"))));
|
||||
if (Objects.nonNull(rawLog.get("response"))) {
|
||||
List<Map<String, Object>> response = (List<Map<String, Object>>) rawLog.get("response");
|
||||
String dnsA = "";
|
||||
@@ -40,7 +42,8 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
|
||||
int dnsMxNum = 0;
|
||||
for (Map<String, Object> resMap : response) {
|
||||
String type = StrUtil.toString(resMap.get("res_type"));
|
||||
String body = StrUtil.toString(resMap.get("res_body"));
|
||||
String body = StringUtils.lowerCase(StrUtil.toString(resMap.get("res_body")));
|
||||
resMap.put("res_body",body);
|
||||
if (DnsType.A.getCode().equals(type)) {
|
||||
dnsA = Joiner.on(",").skipNulls().join(dnsA, body);
|
||||
dnsANum++;
|
||||
|
||||
202
src/main/java/com/zdjizhi/utils/ck/CKSink.java
Normal file
202
src/main/java/com/zdjizhi/utils/ck/CKSink.java
Normal file
@@ -0,0 +1,202 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.enums.LogMetadata;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.util.ExecutorThreadFactory;
|
||||
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
import ru.yandex.clickhouse.ClickHousePreparedStatement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
|
||||
import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME;
|
||||
|
||||
public class CKSink extends RichSinkFunction<Map<String, Object>> {
|
||||
private static final Log log = LogFactory.get();
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final Log logger = LogFactory.get();
|
||||
private Connection connection;
|
||||
private ClickHousePreparedStatement preparedStatement = null;
|
||||
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
|
||||
private final List<Map> ipWithDataList;
|
||||
|
||||
// 满足此时间条件写出数据
|
||||
private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
|
||||
// 插入的批次
|
||||
private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
|
||||
|
||||
private transient volatile boolean closed = false;
|
||||
private transient ScheduledExecutorService scheduler;
|
||||
private transient ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
private static final Map<String, String[]> logMetadataFields = new HashMap<>();
|
||||
private static final Map<String, String> logMetadataSql = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (LogMetadata value : LogMetadata.values()) {
|
||||
logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
|
||||
logMetadataFields.put(value.getSink(), value.getFields());
|
||||
}
|
||||
}
|
||||
|
||||
// 数据表名
|
||||
private String sink;
|
||||
|
||||
public CKSink(String sink) {
|
||||
this.sink = sink;
|
||||
this.ipWithDataList = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public String getSink() {
|
||||
return sink;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the target database and initializes the prepared statement.
|
||||
*/
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
connection = CKUtils.getConnection();
|
||||
String sql = logMetadataSql.get(sink);
|
||||
log.debug(sql);
|
||||
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
|
||||
|
||||
if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
|
||||
this.scheduler = Executors.newScheduledThreadPool(
|
||||
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
|
||||
this.scheduledFuture =
|
||||
this.scheduler.scheduleWithFixedDelay(
|
||||
() -> {
|
||||
synchronized (CKSink.this) {
|
||||
if (!closed) {
|
||||
try {
|
||||
logger.info("ck_flush.............");
|
||||
flushClose();
|
||||
} catch (Exception e) {
|
||||
log.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
insertCkTimeInterval,
|
||||
insertCkTimeInterval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
|
||||
ipWithDataList.add(row);
|
||||
/**
|
||||
* 1. 将数据写入CK
|
||||
*/
|
||||
if (ipWithDataList.size() >= this.insertCkBatchSize) {
|
||||
try {
|
||||
flush(ipWithDataList);
|
||||
logger.info("insertCkBatchSize");
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Preparation of JDBC statement failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 插入数据
|
||||
private synchronized void flush(List<Map> data) throws SQLException {
|
||||
if (data.size() > 0) {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
log.info("开始写入ck数据 :{}", data.size());
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
String[] logFields = logMetadataFields.get(sink);
|
||||
|
||||
for (Map<String, Object> map : data) {
|
||||
for (int i = 0; i < logFields.length; i++) {
|
||||
preparedStatement.setObject(i + 1, map.get(logFields[i]));
|
||||
}
|
||||
preparedStatement.addBatch();
|
||||
}
|
||||
preparedStatement.executeBatch();
|
||||
connection.commit();
|
||||
preparedStatement.clearBatch();
|
||||
stopWatch.stop();
|
||||
log.info("总共花费时间 {} ms", stopWatch.getTime());
|
||||
log.info("写入ck表{},数据 {}", sink, data.size());
|
||||
ipWithDataList.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void flushClose() {
|
||||
try {
|
||||
flush(ipWithDataList);
|
||||
} catch (SQLException e) {
|
||||
log.error("Preparation of JDBC statement failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes prepared statement and closes all resources of this instance.
|
||||
*
|
||||
* @throws IOException Thrown, if the input could not be closed properly.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
if (this.scheduledFuture != null) {
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduler.shutdown();
|
||||
}
|
||||
|
||||
if (preparedStatement != null) {
|
||||
flushClose();
|
||||
try {
|
||||
preparedStatement.close();
|
||||
} catch (SQLException e) {
|
||||
log.error("JDBC statement could not be closed: " + e.getMessage());
|
||||
} finally {
|
||||
preparedStatement = null;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
CKUtils.close(connection);
|
||||
} catch (Exception e) {
|
||||
log.error("JDBC connection could not be closed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
208
src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java
Normal file
208
src/main/java/com/zdjizhi/utils/ck/ClickHouseJDBCOutput.java
Normal file
@@ -0,0 +1,208 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.zdjizhi.utils.ck;
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.enums.LogMetadata;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.util.ExecutorThreadFactory;
|
||||
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import ru.yandex.clickhouse.ClickHousePreparedStatement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
|
||||
import static com.zdjizhi.common.FlowWriteConfig.SINK_CK_BATCH_DELAY_TIME;
|
||||
|
||||
public class ClickHouseJDBCOutput extends RichSinkFunction<Map<String, Object>> {
|
||||
private static final Log log = LogFactory.get();
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClickHouseJDBCOutput.class);
|
||||
|
||||
private Connection connection;
|
||||
private ClickHousePreparedStatement preparedStatement = null;
|
||||
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
|
||||
private final List<Map> ipWithDataList;
|
||||
|
||||
// 满足此时间条件写出数据
|
||||
private final long insertCkTimeInterval = SINK_CK_BATCH_DELAY_TIME; // 4000L
|
||||
// 插入的批次
|
||||
private final int insertCkBatchSize = CK_BATCH; // 开发测试用10条
|
||||
|
||||
private transient volatile boolean closed = false;
|
||||
private transient ScheduledExecutorService scheduler;
|
||||
private transient ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
private static final Map<String, String[]> logMetadataFields = new HashMap<>();
|
||||
private static final Map<String, String> logMetadataSql = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (LogMetadata value : LogMetadata.values()) {
|
||||
logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
|
||||
logMetadataFields.put(value.getSink(), value.getFields());
|
||||
}
|
||||
}
|
||||
|
||||
// 数据表名
|
||||
private String sink;
|
||||
|
||||
public ClickHouseJDBCOutput(String sink) {
|
||||
this.sink = sink;
|
||||
this.ipWithDataList = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public String getSink() {
|
||||
return sink;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the target database and initializes the prepared statement.
|
||||
*/
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
connection = CKUtils.getConnection();
|
||||
String sql = logMetadataSql.get(sink);
|
||||
log.debug(sql);
|
||||
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
|
||||
|
||||
if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {
|
||||
this.scheduler = Executors.newScheduledThreadPool(
|
||||
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
|
||||
this.scheduledFuture =
|
||||
this.scheduler.scheduleWithFixedDelay(
|
||||
() -> {
|
||||
synchronized (ClickHouseJDBCOutput.this) {
|
||||
if (!closed) {
|
||||
try {
|
||||
logger.info("ck_flush.............");
|
||||
flushClose();
|
||||
} catch (Exception e) {
|
||||
// flushException = e;
|
||||
log.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
insertCkTimeInterval,
|
||||
insertCkTimeInterval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public final synchronized void invoke(Map<String, Object> row, Context context) throws IOException {
|
||||
ipWithDataList.add(row);
|
||||
/**
|
||||
* 1. 将数据写入CK
|
||||
*/
|
||||
if (ipWithDataList.size() >= this.insertCkBatchSize) {
|
||||
try {
|
||||
flush(ipWithDataList);
|
||||
logger.info("insertCkBatchSize");
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException("Preparation of JDBC statement failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 插入数据
|
||||
private synchronized void flush(List<Map> data) throws SQLException {
|
||||
if (data.size() > 0) {
|
||||
// checkFlushException();
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
log.info("开始写入ck数据 :{}", data.size());
|
||||
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
String[] logFields = logMetadataFields.get(sink);
|
||||
|
||||
for (Map<String, Object> map : data) {
|
||||
for (int i = 0; i < logFields.length; i++) {
|
||||
preparedStatement.setObject(i + 1, map.get(logFields[i]));
|
||||
}
|
||||
preparedStatement.addBatch();
|
||||
}
|
||||
preparedStatement.executeBatch();
|
||||
connection.commit();
|
||||
preparedStatement.clearBatch();
|
||||
stopWatch.stop();
|
||||
log.info("总共花费时间 {} ms", stopWatch.getTime());
|
||||
log.info("写入ck表{},数据 {}", sink, data.size());
|
||||
ipWithDataList.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void flushClose() {
|
||||
try {
|
||||
flush(ipWithDataList);
|
||||
} catch (SQLException e) {
|
||||
log.error("Preparation of JDBC statement failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes prepared statement and closes all resources of this instance.
|
||||
*
|
||||
* @throws IOException Thrown, if the input could not be closed properly.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
|
||||
if (this.scheduledFuture != null) {
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduler.shutdown();
|
||||
}
|
||||
|
||||
if (preparedStatement != null) {
|
||||
flushClose();
|
||||
try {
|
||||
preparedStatement.close();
|
||||
} catch (SQLException e) {
|
||||
log.error("JDBC statement could not be closed: " + e.getMessage());
|
||||
} finally {
|
||||
preparedStatement = null;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
CKUtils.close(connection);
|
||||
} catch (Exception e) {
|
||||
log.error("JDBC connection could not be closed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.zdjizhi.enums.LogMetadata;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
import ru.yandex.clickhouse.ClickHousePreparedStatement;
|
||||
@@ -63,11 +62,10 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
||||
public void executeInsert(List<Map<String, Object>> data, String tableName) {
|
||||
|
||||
try {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
// StopWatch stopWatch = new StopWatch();
|
||||
// stopWatch.start();
|
||||
log.debug("开始写入ck数据 :{}", data.size());
|
||||
|
||||
boolean autoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
String[] logFields = logMetadataFields.get(tableName);
|
||||
@@ -81,12 +79,11 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
|
||||
}
|
||||
preparedStatement.addBatch();
|
||||
}
|
||||
|
||||
preparedStatement.executeBatch();
|
||||
connection.commit();
|
||||
connection.setAutoCommit(autoCommit);
|
||||
stopWatch.stop();
|
||||
log.debug("总共花费时间 {}", stopWatch.getTime());
|
||||
// stopWatch.stop();
|
||||
// log.debug("总共花费时间 {} ms", stopWatch.getTime());
|
||||
log.debug("写入ck表{},数据 {}",tableName, data.size());
|
||||
} catch (Exception ex) {
|
||||
log.error("ClickhouseSink插入报错", ex);
|
||||
}
|
||||
|
||||
106
src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java
Normal file
106
src/main/java/com/zdjizhi/utils/ck/ClickhouseSink2.java
Normal file
@@ -0,0 +1,106 @@
|
||||
//package com.zdjizhi.utils.ck;
|
||||
//
|
||||
//import cn.hutool.core.io.IoUtil;
|
||||
//import cn.hutool.log.Log;
|
||||
//import cn.hutool.log.LogFactory;
|
||||
//import com.zdjizhi.enums.LogMetadata;
|
||||
//import org.apache.commons.lang3.time.StopWatch;
|
||||
//import org.apache.flink.configuration.Configuration;
|
||||
//import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
|
||||
//import ru.yandex.clickhouse.ClickHousePreparedStatement;
|
||||
//
|
||||
//import java.sql.Connection;
|
||||
//import java.util.HashMap;
|
||||
//import java.util.List;
|
||||
//import java.util.Map;
|
||||
//
|
||||
//import static com.zdjizhi.common.FlowWriteConfig.CK_BATCH;
|
||||
//
|
||||
//public class ClickhouseSink2 extends RichSinkFunction<List<Map<String, Object>>> {
|
||||
//
|
||||
// private static final Log log = LogFactory.get();
|
||||
//
|
||||
// private Connection connection;
|
||||
// private ClickHousePreparedStatement preparedStatement;
|
||||
// public String sink;
|
||||
//
|
||||
// private static final Map<String, String[]> logMetadataFields = new HashMap<>();
|
||||
// private static final Map<String, String> logMetadataSql = new HashMap<>();
|
||||
//
|
||||
// static {
|
||||
// for (LogMetadata value : LogMetadata.values()) {
|
||||
// logMetadataSql.put(value.getSink(), LogMetadata.preparedSql(value.getSink()));
|
||||
// logMetadataFields.put(value.getSink(), value.getFields());
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public ClickhouseSink2(String sink) {
|
||||
// this.sink = sink;
|
||||
// }
|
||||
//
|
||||
// public String getSink() {
|
||||
// return sink;
|
||||
// }
|
||||
//
|
||||
// public void setSink(String sink) {
|
||||
// this.sink = sink;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void invoke(List<Map<String, Object>> logs, Context context) throws Exception {
|
||||
// executeInsert(logs, getSink());
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void open(Configuration parameters) throws Exception {
|
||||
// connection = CKUtils.getConnection();
|
||||
// String sql = logMetadataSql.get(sink);
|
||||
// log.debug(sql);
|
||||
// connection.setAutoCommit(false);
|
||||
// preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(sql);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void close() throws Exception {
|
||||
// IoUtil.close(preparedStatement);
|
||||
// CKUtils.close(connection);
|
||||
// }
|
||||
//
|
||||
// public void executeInsert(List<Map<String, Object>> data, String tableName) {
|
||||
//
|
||||
// try {
|
||||
// StopWatch stopWatch = new StopWatch();
|
||||
// stopWatch.start();
|
||||
// log.info("开始写入ck数据 :{}", data.size());
|
||||
//
|
||||
// String[] logFields = logMetadataFields.get(tableName);
|
||||
//
|
||||
// int count = 0;
|
||||
// for (Map<String, Object> map : data) {
|
||||
// for (int i = 0; i < logFields.length; i++) {
|
||||
// preparedStatement.setObject(i + 1, map.get(logFields[i]));
|
||||
// }
|
||||
// preparedStatement.addBatch();
|
||||
// count++;
|
||||
// if (count % CK_BATCH == 0) {
|
||||
// preparedStatement.executeBatch();
|
||||
// connection.commit();
|
||||
// preparedStatement.clearBatch();
|
||||
// count = 0;
|
||||
// }
|
||||
// }
|
||||
// if (count > 0) {
|
||||
// preparedStatement.executeBatch();
|
||||
// connection.commit();
|
||||
// preparedStatement.clearBatch();
|
||||
// }
|
||||
//
|
||||
// stopWatch.stop();
|
||||
// log.info("总共花费时间 {} ms", stopWatch.getTime());
|
||||
// log.info("写入ck表{},数据 {}", tableName, data.size());
|
||||
// } catch (Exception ex) {
|
||||
// log.error("ClickhouseSink插入报错", ex);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//}
|
||||
@@ -181,7 +181,7 @@ public class TypeUtils {
|
||||
|
||||
public static Object coverMSToS(Object ms) {
|
||||
if (StrUtil.toString(ms).length() == 13) {
|
||||
return StrUtil.sub(StrUtil.toString(ms), 0, 10);
|
||||
return Convert.toLong(StrUtil.sub(StrUtil.toString(ms), 0, 10));
|
||||
}
|
||||
return ms;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user